You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/04/10 22:17:54 UTC
svn commit: r1466658 [1/2] - in
/hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...
Author: szetszwo
Date: Wed Apr 10 20:17:39 2013
New Revision: 1466658
URL: http://svn.apache.org/r1466658
Log:
Merge r1464808 through r1466652 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
- copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
- copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
- copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/InvalidResourceRequestException.java
- copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/InvalidResourceRequestException.java
Modified:
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt Wed Apr 10 20:17:39 2013
@@ -25,6 +25,12 @@ Trunk - Unreleased
YARN-491. TestContainerLogsPage fails on Windows. (Chris Nauroth via hitesh)
+ YARN-557. Fix TestUnmanagedAMLauncher failure on Windows. (Chris Nauroth via
+ vinodkv)
+
+ YARN-524 TestYarnVersionInfo failing if generated properties doesn't
+ include an SVN URL. (stevel)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS
YARN-158. Yarn creating package-info.java must not depend on sh.
@@ -55,9 +61,6 @@ Trunk - Unreleased
YARN-359. Fixing commands for container signalling in Windows. (Chris Nauroth
via vinodkv)
- YARN-524 TestYarnVersionInfo failing if generated properties doesn't
- include an SVN URL(stevel)
-
Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
@@ -123,6 +126,12 @@ Release 2.0.5-beta - UNRELEASED
YARN-458. YARN daemon addresses must be placed in many different configs.
(sandyr via tucu)
+ YARN-193. Scheduler.normalizeRequest does not account for allocation
+ requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
+
+ YARN-479. NM retry behavior for connection to RM should be similar for
+ lost heartbeats (Jian He via bikas)
+
OPTIMIZATIONS
BUG FIXES
@@ -195,6 +204,16 @@ Release 2.0.5-beta - UNRELEASED
to implement closeable so that they can be stopped when needed via
RPC.stopProxy(). (Siddharth Seth via vinodkv)
+ YARN-99. Modify private distributed cache to localize files such that no
+ local directory hits unix file count limits and thus prevent job failures.
+ (Omkar Vinit Joshi via vinodkv)
+
+ YARN-112. Fixed a race condition during localization that fails containers.
+ (Omkar Vinit Joshi via vinodkv)
+
+ YARN-534. Change RM restart recovery to also account for AM max-attempts
+ configuration after the restart. (Jian He via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@@ -509,6 +528,8 @@ Release 0.23.7 - UNRELEASED
YARN-200. yarn log does not output all needed information, and is in a
binary format (Ravi Prakash via jlowe)
+ YARN-525. make CS node-locality-delay refreshable (Thomas Graves via jlowe)
+
OPTIMIZATIONS
YARN-357. App submission should not be synchronized (daryn)
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java Wed Apr 10 20:17:39 2013
@@ -103,7 +103,7 @@ public interface ApplicationConstants {
* $USER
* Final, non-modifiable.
*/
- USER("USER"),
+ USER(Shell.WINDOWS ? "USERNAME": "USER"),
/**
* $LOGNAME
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Apr 10 20:17:39 2013
@@ -683,7 +683,7 @@ public class ApplicationMaster {
ctx.setResource(container.getResource());
String jobUserName = System.getenv(ApplicationConstants.Environment.USER
- .name());
+ .key());
ctx.setUser(jobUserName);
LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Wed Apr 10 20:17:39 2013
@@ -30,6 +30,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
@@ -50,7 +51,7 @@ public class TestUnmanagedAMLauncher {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(
- TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
+ TestUnmanagedAMLauncher.class.getSimpleName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
URL url = Thread.currentThread().getContextClassLoader()
@@ -93,7 +94,7 @@ public class TestUnmanagedAMLauncher {
return envClassPath;
}
- @Test(timeout=10000)
+ @Test(timeout=30000)
public void testDSShell() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
@@ -112,7 +113,8 @@ public class TestUnmanagedAMLauncher {
javaHome
+ "/bin/java -Xmx512m "
+ "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
- + "--container_memory 128 --num_containers 1 --priority 0 --shell_command ls" };
+ + "--container_memory 128 --num_containers 1 --priority 0 "
+ + "--shell_command " + (Shell.WINDOWS ? "dir" : "ls") };
LOG.info("Initializing Launcher");
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Apr 10 20:17:39 2013
@@ -122,9 +122,9 @@ public class YarnConfiguration extends C
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_MB =
YARN_PREFIX + "scheduler.maximum-allocation-mb";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192;
- public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES =
+ public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES =
YARN_PREFIX + "scheduler.maximum-allocation-vcores";
- public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES = 32;
+ public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Wed Apr 10 20:17:39 2013
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
@@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Download a single URL to the local disk.
@@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.Conve
public class FSDownload implements Callable<Path> {
private static final Log LOG = LogFactory.getLog(FSDownload.class);
-
- private Random rand;
+
private FileContext files;
private final UserGroupInformation userUgi;
private Configuration conf;
@@ -71,13 +68,12 @@ public class FSDownload implements Calla
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
- Path destDirPath, LocalResource resource, Random rand) {
+ Path destDirPath, LocalResource resource) {
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
- this.rand = rand;
}
LocalResource getResource() {
@@ -270,11 +266,6 @@ public class FSDownload implements Calla
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
- Path tmp;
- do {
- tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
- } while (files.util().exists(tmp));
- destDirPath = tmp;
createDir(destDirPath, cachePerms);
final Path dst_work = new Path(destDirPath + "_tmp");
createDir(dst_work, cachePerms);
@@ -305,8 +296,6 @@ public class FSDownload implements Calla
files.delete(dst_work, true);
} catch (FileNotFoundException ignore) {
}
- // clear ref to internal var
- rand = null;
conf = null;
resource = null;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Wed Apr 10 20:17:39 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
@@ -66,6 +67,8 @@ import org.junit.Test;
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
+ private static AtomicLong uniqueNumberGenerator =
+ new AtomicLong(System.currentTimeMillis());
@AfterClass
public static void deleteTestDir() throws IOException {
@@ -267,9 +270,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), size, conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@@ -320,9 +325,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), sizes[i], conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@@ -380,9 +387,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrc = createTarFile(files, p, size, rand, vis);
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@@ -437,9 +445,10 @@ public class TestFSDownload {
LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
rsrcjar.setType(LocalResourceType.PATTERN);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPathjar = new Path (destPathjar,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdjar = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
pending.put(rsrcjar, exec.submit(fsdjar));
try {
@@ -493,9 +502,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPathjar = new Path (destPathjar,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdzip = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
pending.put(rsrczip, exec.submit(fsdzip));
try {
@@ -586,9 +596,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@@ -614,4 +626,38 @@ public class TestFSDownload {
}
-}
+
+ @Test(timeout = 1000)
+ public void testUniqueDestinationPath() throws Exception {
+ Configuration conf = new Configuration();
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
+
+ LocalDirAllocator dirs =
+ new LocalDirAllocator(TestFSDownload.class.getName());
+ Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf);
+ destPath =
+ new Path(destPath, Long.toString(uniqueNumberGenerator
+ .incrementAndGet()));
+ try {
+ Path p = new Path(basedir, "dir" + 0 + ".jar");
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+ LocalResource rsrc = createJar(files, p, vis);
+ FSDownload fsd =
+ new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
+ destPath, rsrc);
+ Future<Path> rPath = singleThreadedExec.submit(fsd);
+ // Now FSDownload will not create a random directory to localize the
+ // resource. Therefore the final localizedPath for the resource should be
+ // destination directory (passed as an argument) + file name.
+ Assert.assertEquals(destPath, rPath.get().getParent());
+ } finally {
+ singleThreadedExec.shutdown();
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Apr 10 20:17:39 2013
@@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl exten
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
-
- private boolean previousHeartBeatSucceeded;
- private List<ContainerStatus> previousContainersStatuses =
- new ArrayList<ContainerStatus>();
+ private long rmConnectWaitMS;
+ private long rmConnectionRetryIntervalMS;
+ private boolean waitForEver;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -99,7 +98,6 @@ public class NodeStatusUpdaterImpl exten
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
- this.previousHeartBeatSucceeded = true;
}
@Override
@@ -137,8 +135,8 @@ public class NodeStatusUpdaterImpl exten
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
- " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
- " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
+ " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
+ " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
super.init(conf);
}
@@ -192,12 +190,12 @@ public class NodeStatusUpdaterImpl exten
private void registerWithRM() throws YarnRemoteException {
Configuration conf = getConfig();
- long rmConnectWaitMS =
+ rmConnectWaitMS =
conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
* 1000;
- long rmConnectionRetryIntervalMS =
+ rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration
@@ -210,7 +208,7 @@ public class NodeStatusUpdaterImpl exten
" should not be negative.");
}
- boolean waitForEver = (rmConnectWaitMS == -1000);
+ waitForEver = (rmConnectWaitMS == -1000);
if(! waitForEver) {
if(rmConnectWaitMS < 0) {
@@ -319,14 +317,8 @@ public class NodeStatusUpdaterImpl exten
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
- List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
- if(previousHeartBeatSucceeded) {
- previousContainersStatuses.clear();
- } else {
- containersStatuses.addAll(previousContainersStatuses);
- }
-
int numActiveContainers = 0;
+ List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@@ -341,7 +333,6 @@ public class NodeStatusUpdaterImpl exten
LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) {
- previousContainersStatuses.add(containerStatus);
// Remove
i.remove();
@@ -404,6 +395,9 @@ public class NodeStatusUpdaterImpl exten
while (!isStopped) {
// Send heartbeat
try {
+ NodeHeartbeatResponse response = null;
+ int rmRetryCount = 0;
+ long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
@@ -414,9 +408,31 @@ public class NodeStatusUpdaterImpl exten
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey());
}
- NodeHeartbeatResponse response =
- resourceTracker.nodeHeartbeat(request);
- previousHeartBeatSucceeded = true;
+ while (!isStopped) {
+ try {
+ rmRetryCount++;
+ response = resourceTracker.nodeHeartbeat(request);
+ break;
+ } catch (Throwable e) {
+ LOG.warn("Trying to heartbeat to ResourceManager, "
+ + "current no. of failed attempts is " + rmRetryCount);
+ if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+ || waitForEver) {
+ try {
+ LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ + " seconds before next heartbeat to RM");
+ Thread.sleep(rmConnectionRetryIntervalMS);
+ } catch(InterruptedException ex) {
+ //done nothing
+ }
+ } else {
+ String errorMessage = "Failed to heartbeat to RM, " +
+ "no. of failed attempts is "+rmRetryCount;
+ LOG.error(errorMessage,e);
+ throw new YarnException(errorMessage,e);
+ }
+ }
+ }
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over
@@ -432,7 +448,7 @@ public class NodeStatusUpdaterImpl exten
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
- " hence shutting down.");
+ " hence shutting down.");
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
@@ -461,8 +477,12 @@ public class NodeStatusUpdaterImpl exten
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
+ } catch (YarnException e) {
+ //catch and throw the exception if tried MAX wait time to connect RM
+ dispatcher.getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+ throw e;
} catch (Throwable e) {
- previousHeartBeatSucceeded = false;
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java Wed Apr 10 20:17:39 2013
@@ -18,18 +18,13 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.*;
public interface LocalizerHeartbeatResponse {
- public LocalizerAction getLocalizerAction();
- public List<LocalResource> getAllResources();
- public LocalResource getLocalResource(int i);
+ public LocalizerAction getLocalizerAction();
public void setLocalizerAction(LocalizerAction action);
- public void addAllResources(List<LocalResource> resources);
- public void addResource(LocalResource resource);
- public void removeResource(int index);
- public void clearResources();
-}
+ public List<ResourceLocalizationSpec> getResourceSpecs();
+ public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java Wed Apr 10 20:17:39 2013
@@ -21,13 +21,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponseP
LocalizerHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
- private List<LocalResource> resources;
+ private List<ResourceLocalizationSpec> resourceSpecs;
public LocalizerHeartbeatResponsePBImpl() {
builder = LocalizerHeartbeatResponseProto.newBuilder();
}
- public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+ public LocalizerHeartbeatResponsePBImpl(
+ LocalizerHeartbeatResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponseP
}
private void mergeLocalToBuilder() {
- if (resources != null) {
+ if (resourceSpecs != null) {
addResourcesToProto();
}
}
@@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponseP
viaProto = false;
}
+ @Override
public LocalizerAction getLocalizerAction() {
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasAction()) {
@@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponseP
return convertFromProtoFormat(p.getAction());
}
- public List<LocalResource> getAllResources() {
- initResources();
- return this.resources;
- }
-
- public LocalResource getLocalResource(int i) {
+ @Override
+ public List<ResourceLocalizationSpec> getResourceSpecs() {
initResources();
- return this.resources.get(i);
+ return this.resourceSpecs;
}
public void setLocalizerAction(LocalizerAction action) {
@@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponseP
builder.setAction(convertToProtoFormat(action));
}
+ public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
+ maybeInitBuilder();
+ if (rsrcs == null) {
+ builder.clearResources();
+ return;
+ }
+ this.resourceSpecs = rsrcs;
+ }
+
private void initResources() {
- if (this.resources != null) {
+ if (this.resourceSpecs != null) {
return;
}
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<LocalResourceProto> list = p.getResourcesList();
- this.resources = new ArrayList<LocalResource>();
-
- for (LocalResourceProto c : list) {
- this.resources.add(convertFromProtoFormat(c));
+ List<ResourceLocalizationSpecProto> list = p.getResourcesList();
+ this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
+ for (ResourceLocalizationSpecProto c : list) {
+ this.resourceSpecs.add(convertFromProtoFormat(c));
}
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResources();
- if (this.resources == null)
+ if (this.resourceSpecs == null)
return;
- Iterable<LocalResourceProto> iterable =
- new Iterable<LocalResourceProto>() {
+ Iterable<ResourceLocalizationSpecProto> iterable =
+ new Iterable<ResourceLocalizationSpecProto>() {
@Override
- public Iterator<LocalResourceProto> iterator() {
- return new Iterator<LocalResourceProto>() {
+ public Iterator<ResourceLocalizationSpecProto> iterator() {
+ return new Iterator<ResourceLocalizationSpecProto>() {
- Iterator<LocalResource> iter = resources.iterator();
+ Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
@Override
public boolean hasNext() {
@@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponseP
}
@Override
- public LocalResourceProto next() {
- return convertToProtoFormat(iter.next());
+ public ResourceLocalizationSpecProto next() {
+ ResourceLocalizationSpec resource = iter.next();
+
+ return ((ResourceLocalizationSpecPBImpl)resource).getProto();
}
@Override
@@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponseP
builder.addAllResources(iterable);
}
- public void addAllResources(List<LocalResource> resources) {
- if (resources == null)
- return;
- initResources();
- this.resources.addAll(resources);
- }
- public void addResource(LocalResource resource) {
- initResources();
- this.resources.add(resource);
- }
-
- public void removeResource(int index) {
- initResources();
- this.resources.remove(index);
- }
-
- public void clearResources() {
- initResources();
- this.resources.clear();
- }
-
- private LocalResource convertFromProtoFormat(LocalResourceProto p) {
- return new LocalResourcePBImpl(p);
- }
-
- private LocalResourceProto convertToProtoFormat(LocalResource s) {
- return ((LocalResourcePBImpl)s).getProto();
+ private ResourceLocalizationSpec convertFromProtoFormat(
+ ResourceLocalizationSpecProto p) {
+ return new ResourceLocalizationSpecPBImpl(p);
}
private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponseP
private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
return LocalizerAction.valueOf(a.name());
}
-
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Apr 10 20:17:39 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@@ -89,8 +91,6 @@ public class ContainerLocalizer {
private final String localizerId;
private final FileContext lfs;
private final Configuration conf;
- private final LocalDirAllocator appDirs;
- private final LocalDirAllocator userDirs;
private final RecordFactory recordFactory;
private final Map<LocalResource,Future<Path>> pendingResources;
private final String appCacheDirContextName;
@@ -112,8 +112,6 @@ public class ContainerLocalizer {
this.recordFactory = recordFactory;
this.conf = new Configuration();
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
- this.appDirs = new LocalDirAllocator(appCacheDirContextName);
- this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
}
@@ -197,10 +195,10 @@ public class ContainerLocalizer {
return new ExecutorCompletionService<Path>(exec);
}
- Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
+ Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
- Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
- return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+ DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+ return new FSDownload(lfs, ugi, conf, path, rsrc);
}
static long getEstimatedSize(LocalResource rsrc) {
@@ -238,25 +236,12 @@ public class ContainerLocalizer {
LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
switch (response.getLocalizerAction()) {
case LIVE:
- List<LocalResource> newResources = response.getAllResources();
- for (LocalResource r : newResources) {
- if (!pendingResources.containsKey(r)) {
- final LocalDirAllocator lda;
- switch (r.getVisibility()) {
- default:
- LOG.warn("Unknown visibility: " + r.getVisibility()
- + ", Using userDirs");
- //Falling back to userDirs for unknown visibility.
- case PUBLIC:
- case PRIVATE:
- lda = userDirs;
- break;
- case APPLICATION:
- lda = appDirs;
- break;
- }
- // TODO: Synchronization??
- pendingResources.put(r, cs.submit(download(lda, r, ugi)));
+ List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
+ for (ResourceLocalizationSpec newRsrc : newRsrcs) {
+ if (!pendingResources.containsKey(newRsrc.getResource())) {
+ pendingResources.put(newRsrc.getResource(), cs.submit(download(
+ new Path(newRsrc.getDestinationDirectory().getFile()),
+ newRsrc.getResource(), ugi)));
}
}
break;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Wed Apr 10 20:17:39 2013
@@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Wed Apr 10 20:17:39 2013
@@ -43,4 +43,5 @@ interface LocalResourcesTracker
// TODO: Remove this in favour of EventHandler.handle
void localizationCompleted(LocalResourceRequest req, boolean success);
+ long nextUniqueNumber();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Apr 10 20:17:39 2013
@@ -21,6 +21,7 @@ import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implemen
*/
private ConcurrentHashMap<LocalResourceRequest, Path>
inProgressLocalResourcesMap;
+ /*
+ * starting with 10 to accommodate 0-9 directories created as a part of
+ * LocalCacheDirectoryManager. So there will be one unique number generator
+ * per APPLICATION, USER and PUBLIC cache.
+ */
+ private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
boolean useLocalCacheDirectoryManager, Configuration conf) {
@@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implemen
}
}
}
+
+ @Override
+ public long nextUniqueNumber() {
+ return uniqueNumberGenerator.incrementAndGet();
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Apr 10 20:17:39 2013
@@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -80,10 +79,12 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -326,7 +328,7 @@ public class ResourceLocalizationService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
- dispatcher, false, super.getConfig()));
+ dispatcher, true, super.getConfig()));
if (null != appRsrc.putIfAbsent(
ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@@ -476,6 +478,21 @@ public class ResourceLocalizationService
}
}
+ private String getUserFileCachePath(String user) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+ return path;
+ }
+
+ private String getUserAppCachePath(String user, String appId) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+ + Path.SEPARATOR + appId;
+ return path;
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -648,8 +665,11 @@ public class ResourceLocalizationService
DiskChecker.checkDir(
new File(publicDirDestPath.toUri().getPath()));
}
+ publicDirDestPath =
+ new Path(publicDirDestPath, Long.toString(publicRsrc
+ .nextUniqueNumber()));
pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirDestPath, resource, new Random())),
+ lfs, null, conf, publicDirDestPath, resource)),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) {
@@ -803,7 +823,20 @@ public class ResourceLocalizationService
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
- response.addResource(next);
+ try {
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec rsrc =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(rsrc);
+ response.setResourceSpecs(rsrcs);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be found."
+ + "Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ // TODO fail? Already translated several times...
+ }
} else if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@@ -812,7 +845,8 @@ public class ResourceLocalizationService
}
return response;
}
-
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
@@ -835,6 +869,7 @@ public class ResourceLocalizationService
new ResourceLocalizedEvent(req,
ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
stat.getLocalSize()));
+ localizationCompleted(stat);
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
// TODO: Synchronization
@@ -844,7 +879,17 @@ public class ResourceLocalizationService
response.setLocalizerAction(LocalizerAction.LIVE);
LocalResource next = findNextResource();
if (next != null) {
- response.addResource(next);
+ try {
+ ResourceLocalizationSpec resource =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(resource);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be " +
+ "found. Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ //TODO fail? Already translated several times...
+ }
}
break;
case FETCH_PENDING:
@@ -854,6 +899,7 @@ public class ResourceLocalizationService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
+ localizationCompleted(stat);
// TODO: Why is this event going directly to the container. Why not
// the resource itself? What happens to the resource? Is it removed?
dispatcher.getEventHandler().handle(
@@ -869,9 +915,53 @@ public class ResourceLocalizationService
break;
}
}
+ response.setResourceSpecs(rsrcs);
return response;
}
+ private void localizationCompleted(LocalResourceStatus stat) {
+ try {
+ LocalResource rsrc = stat.getResource();
+ LocalResourceRequest key = new LocalResourceRequest(rsrc);
+ String user = context.getUser();
+ ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ LocalResourceVisibility vis = rsrc.getVisibility();
+ LocalResourcesTracker tracker =
+ getLocalResourcesTracker(vis, user, appId);
+ if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
+ tracker.localizationCompleted(key, true);
+ } else {
+ tracker.localizationCompleted(key, false);
+ }
+ } catch (URISyntaxException e) {
+ LOG.error("Invalid resource URL specified", e);
+ }
+ }
+
+ private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+ URISyntaxException {
+ String user = context.getUser();
+ ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
+ LocalResourceVisibility vis = rsrc.getVisibility();
+ LocalResourcesTracker tracker =
+ getLocalResourcesTracker(vis, user, appId);
+ String cacheDirectory = null;
+ if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+ cacheDirectory = getUserFileCachePath(user);
+ } else {// APPLICATION ONLY
+ cacheDirectory = getUserAppCachePath(user, appId.toString());
+ }
+ Path dirPath =
+ dirsHandler.getLocalPathForWrite(cacheDirectory,
+ ContainerLocalizer.getEstimatedSize(rsrc), false);
+ dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
+ return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+ }
+
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
@@ -1033,4 +1123,4 @@ public class ResourceLocalizationService
del.delete(null, dirPath, new Path[] {});
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Wed Apr 10 20:17:39 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
DIE = 2;
}
+message ResourceLocalizationSpecProto {
+ optional LocalResourceProto resource = 1;
+ optional URLProto destination_directory = 2;
+}
+
message LocalizerHeartbeatResponseProto {
optional LocalizerActionProto action = 1;
- repeated LocalResourceProto resources = 2;
+ repeated ResourceLocalizationSpecProto resources = 2;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Apr 10 20:17:39 2013
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,6 +170,10 @@ public class TestNodeStatusUpdater {
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
+ NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+ Dispatcher mockDispatcher = mock(Dispatcher.class);
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -183,7 +190,8 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
- Container container = new ContainerImpl(conf , null, launchContext, null, null);
+ Container container = new ContainerImpl(conf , mockDispatcher,
+ launchContext, null, mockMetrics);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -207,7 +215,8 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
- Container container = new ContainerImpl(conf, null, launchContext, null, null);
+ Container container = new ContainerImpl(conf, mockDispatcher,
+ launchContext, null, mockMetrics);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -229,13 +238,14 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+ public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
+ resourceTracker = new MyResourceTracker(this.context);
}
@Override
@@ -312,6 +322,21 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+ private ResourceTracker resourceTracker;
+
+ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MyResourceTracker5();
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+ }
+
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -328,6 +353,32 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeManager2 extends NodeManager {
+ public boolean isStopped = false;
+ private NodeStatusUpdater nodeStatusUpdater;
+ private CyclicBarrier syncBarrier;
+ public MyNodeManager2 (CyclicBarrier syncBarrier) {
+ this.syncBarrier = syncBarrier;
+ }
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ nodeStatusUpdater =
+ new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+ metrics);
+ return nodeStatusUpdater;
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ isStopped = true;
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ }
+ }
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -505,6 +556,26 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker5 implements ResourceTracker {
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction );
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ heartBeatID++;
+ throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -883,6 +954,30 @@ public class TestNodeStatusUpdater {
nm.stop();
}
+ @Test(timeout = 20000)
+ public void testNodeStatusUpdaterRetryAndNMShutdown()
+ throws InterruptedException {
+ final long connectionWaitSecs = 1;
+ final long connectionRetryIntervalSecs = 1;
+ YarnConfiguration conf = createNMConfig();
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ connectionWaitSecs);
+ conf.setLong(YarnConfiguration
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ connectionRetryIntervalSecs);
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ nm = new MyNodeManager2(syncBarrier);
+ nm.init(conf);
+ nm.start();
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+ Assert.assertTrue("calculate heartBeatCount based on" +
+ " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+ }
+
private class MyNMContext extends NMContext {
ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Wed Apr 10 20:17:39 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.junit.Test;
-import static org.junit.Assert.*;
public class TestPBRecordImpl {
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
static LocalResource createResource() {
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
assertTrue(ret instanceof LocalResourcePBImpl);
- ret.setResource(
- ConverterUtils.getYarnUrlFromPath(
- new Path("hdfs://y.ak:8020/foo/bar")));
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+ "hdfs://y.ak:8020/foo/bar")));
ret.setSize(4344L);
ret.setTimestamp(3141592653589793L);
ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
return ret;
}
- static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+ static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse()
+ throws URISyntaxException {
LocalizerHeartbeatResponse ret =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
ret.setLocalizerAction(LocalizerAction.LIVE);
- ret.addResource(createResource());
+ LocalResource rsrc = createResource();
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec resource =
+ recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+ resource.setResource(rsrc);
+ resource.setDestinationDirectory(ConverterUtils
+ .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+ rsrcs.add(resource);
+ ret.setResourceSpecs(rsrcs);
+ System.out.println(resource);
return ret;
}
- @Test
+ @Test(timeout=10000)
public void testLocalResourceStatusSerDe() throws Exception {
LocalResourceStatus rsrcS = createLocalResourceStatus();
assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
assertEquals(createResource(), rsrcD.getResource());
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerStatusSerDe() throws Exception {
LocalizerStatus rsrcS = createLocalizerStatus();
assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerHeartbeatResponseSerDe() throws Exception {
LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
assertEquals(rsrcS, rsrcD);
- assertEquals(createResource(), rsrcS.getLocalResource(0));
- assertEquals(createResource(), rsrcD.getLocalResource(0));
+ assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+ assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java Wed Apr 10 20:17:39 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatRespo
implements LocalizerHeartbeatResponse {
LocalizerAction action;
- List<LocalResource> rsrc;
+ List<ResourceLocalizationSpec> resourceSpecs;
MockLocalizerHeartbeatResponse() {
- rsrc = new ArrayList<LocalResource>();
+ resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
}
MockLocalizerHeartbeatResponse(
- LocalizerAction action, List<LocalResource> rsrc) {
+ LocalizerAction action, List<ResourceLocalizationSpec> resources) {
this.action = action;
- this.rsrc = rsrc;
+ this.resourceSpecs = resources;
}
public LocalizerAction getLocalizerAction() { return action; }
- public List<LocalResource> getAllResources() { return rsrc; }
- public LocalResource getLocalResource(int i) { return rsrc.get(i); }
public void setLocalizerAction(LocalizerAction action) {
this.action = action;
}
- public void addAllResources(List<LocalResource> resources) {
- rsrc.addAll(resources);
+
+ @Override
+ public List<ResourceLocalizationSpec> getResourceSpecs() {
+ return resourceSpecs;
+}
+
+ @Override
+ public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
+ this.resourceSpecs = resourceSpecs;
}
- public void addResource(LocalResource resource) { rsrc.add(resource); }
- public void removeResource(int index) { rsrc.remove(index); }
- public void clearResources() { rsrc.clear(); }
}