You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/09 21:56:11 UTC
svn commit: r1466196 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/
hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/
hadoop-yarn/hadoop-yarn-server/ha...
Author: vinodkv
Date: Tue Apr 9 19:56:10 2013
New Revision: 1466196
URL: http://svn.apache.org/r1466196
Log:
YARN-112. Fixed a race condition during localization that fails containers. Contributed by Omkar Vinit Joshi.
MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. Contributed by Omkar Vinit Joshi.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr 9 19:56:10 2013
@@ -208,6 +208,9 @@ Release 2.0.5-beta - UNRELEASED
local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv)
+ YARN-112. Fixed a race condition during localization that fails containers.
+ (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Tue Apr 9 19:56:10 2013
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
@@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Download a single URL to the local disk.
@@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.Conve
public class FSDownload implements Callable<Path> {
private static final Log LOG = LogFactory.getLog(FSDownload.class);
-
- private Random rand;
+
private FileContext files;
private final UserGroupInformation userUgi;
private Configuration conf;
@@ -71,13 +68,12 @@ public class FSDownload implements Calla
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
- Path destDirPath, LocalResource resource, Random rand) {
+ Path destDirPath, LocalResource resource) {
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
- this.rand = rand;
}
LocalResource getResource() {
@@ -270,11 +266,6 @@ public class FSDownload implements Calla
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
- Path tmp;
- do {
- tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
- } while (files.util().exists(tmp));
- destDirPath = tmp;
createDir(destDirPath, cachePerms);
final Path dst_work = new Path(destDirPath + "_tmp");
createDir(dst_work, cachePerms);
@@ -305,8 +296,6 @@ public class FSDownload implements Calla
files.delete(dst_work, true);
} catch (FileNotFoundException ignore) {
}
- // clear ref to internal var
- rand = null;
conf = null;
resource = null;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Tue Apr 9 19:56:10 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
@@ -66,6 +67,8 @@ import org.junit.Test;
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
+ private static AtomicLong uniqueNumberGenerator =
+ new AtomicLong(System.currentTimeMillis());
@AfterClass
public static void deleteTestDir() throws IOException {
@@ -267,9 +270,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), size, conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@@ -320,9 +325,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), sizes[i], conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@@ -380,9 +387,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrc = createTarFile(files, p, size, rand, vis);
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@@ -437,9 +445,10 @@ public class TestFSDownload {
LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
rsrcjar.setType(LocalResourceType.PATTERN);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPathjar = new Path (destPathjar,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdjar = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
pending.put(rsrcjar, exec.submit(fsdjar));
try {
@@ -493,9 +502,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ destPathjar = new Path (destPathjar,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdzip = new FSDownload(files,
- UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
- new Random(sharedSeed));
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
pending.put(rsrczip, exec.submit(fsdzip));
try {
@@ -586,9 +596,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), conf);
+ destPath = new Path (destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- destPath, rsrc, new Random(sharedSeed));
+ destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@@ -614,4 +626,38 @@ public class TestFSDownload {
}
-}
+
+ @Test(timeout = 1000)
+ public void testUniqueDestinationPath() throws Exception {
+ Configuration conf = new Configuration();
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
+
+ LocalDirAllocator dirs =
+ new LocalDirAllocator(TestFSDownload.class.getName());
+ Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf);
+ destPath =
+ new Path(destPath, Long.toString(uniqueNumberGenerator
+ .incrementAndGet()));
+ try {
+ Path p = new Path(basedir, "dir" + 0 + ".jar");
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+ LocalResource rsrc = createJar(files, p, vis);
+ FSDownload fsd =
+ new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
+ destPath, rsrc);
+ Future<Path> rPath = singleThreadedExec.submit(fsd);
+ // Now FSDownload will not create a random directory to localize the
+ // resource. Therefore the final localizedPath for the resource should be
+ // destination directory (passed as an argument) + file name.
+ Assert.assertEquals(destPath, rPath.get().getParent());
+ } finally {
+ singleThreadedExec.shutdown();
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Apr 9 19:56:10 2013
@@ -198,7 +198,7 @@ public class ContainerLocalizer {
Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
- return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
+ return new FSDownload(lfs, ugi, conf, path, rsrc);
}
static long getEstimatedSize(LocalResource rsrc) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue Apr 9 19:56:10 2013
@@ -43,4 +43,5 @@ interface LocalResourcesTracker
// TODO: Remove this in favour of EventHandler.handle
void localizationCompleted(LocalResourceRequest req, boolean success);
+ long nextUniqueNumber();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Apr 9 19:56:10 2013
@@ -21,6 +21,7 @@ import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implemen
*/
private ConcurrentHashMap<LocalResourceRequest, Path>
inProgressLocalResourcesMap;
+ /*
+ * starting with 10 to accommodate 0-9 directories created as a part of
+ * LocalCacheDirectoryManager. So there will be one unique number generator
+ * per APPLICATION, USER and PUBLIC cache.
+ */
+ private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
boolean useLocalCacheDirectoryManager, Configuration conf) {
@@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implemen
}
}
}
+
+ @Override
+ public long nextUniqueNumber() {
+ return uniqueNumberGenerator.incrementAndGet();
+ }
}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Apr 9 19:56:10 2013
@@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -666,8 +665,11 @@ public class ResourceLocalizationService
DiskChecker.checkDir(
new File(publicDirDestPath.toUri().getPath()));
}
+ publicDirDestPath =
+ new Path(publicDirDestPath, Long.toString(publicRsrc
+ .nextUniqueNumber()));
pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirDestPath, resource, new Random())),
+ lfs, null, conf, publicDirDestPath, resource)),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) {
@@ -955,9 +957,9 @@ public class ResourceLocalizationService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
- return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath);
-
+ return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1466196&r1=1466195&r2=1466196&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Apr 9 19:56:10 2013
@@ -520,7 +520,10 @@ public class TestResourceLocalizationSer
new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
URL localizedPath =
response.getResourceSpecs().get(0).getDestinationDirectory();
- assertTrue(localizedPath.getFile().endsWith(localPath));
+ // Appending to local path unique number(10) generated as a part of
+ // LocalResourcesTracker
+ assertTrue(localizedPath.getFile().endsWith(
+ localPath + Path.SEPARATOR + "10"));
// get second resource
response = spyService.heartbeat(stat);
@@ -534,7 +537,7 @@ public class TestResourceLocalizationSer
// LocalCacheDirectoryManager will be used and we have restricted number
// of files per directory to 1.
assertTrue(localizedPath.getFile().endsWith(
- localPath + Path.SEPARATOR + "0"));
+ localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
// empty rsrc
response = spyService.heartbeat(stat);