You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/12/13 00:01:47 UTC
[5/8] tez git commit: TEZ-3876. Bug in local mode distributed cache
files (Jacob Tolar via jeagles)
TEZ-3876. Bug in local mode distributed cache files (Jacob Tolar via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c378b44
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c378b44
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c378b44
Branch: refs/heads/branch-0.9.1
Commit: 4c378b443b20e1f643e894e81ec41271d0356b3f
Parents: cfede26
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Mon Dec 11 14:40:04 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Mon Dec 11 14:40:04 2017 -0600
----------------------------------------------------------------------
.../dag/app/launcher/TezLocalCacheManager.java | 60 ++++++-----
.../app/launcher/TestTezLocalCacheManager.java | 107 +++++++++++++++++++
2 files changed, 142 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
index 80f73aa..45e5540 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -31,7 +31,9 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -90,15 +92,20 @@ public class TezLocalCacheManager {
throw new IllegalArgumentException("Resource type PATTERN not supported.");
}
- // submit task to download the object
- java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
- Path dest = new Path(downloadDir.toAbsolutePath().toString());
- FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
- Future<Path> downloadedPath = threadPool.submit(downloader);
-
// linkPath is the path we want to symlink the file/directory into
Path linkPath = new Path(cwd, entry.getKey());
- resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+
+ if (resourceInfo.containsKey(resource)) {
+ // We've already downloaded this resource and just need to add another link.
+ resourceInfo.get(resource).linkPaths.add(linkPath);
+ } else {
+ // submit task to download the object
+ java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+ Path dest = new Path(downloadDir.toAbsolutePath().toString());
+ FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
+ Future<Path> downloadedPath = threadPool.submit(downloader);
+ resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+ }
}
// Link each file
@@ -106,20 +113,21 @@ public class TezLocalCacheManager {
LocalResource resource = entry.getKey();
ResourceInfo resourceMeta = entry.getValue();
- Path linkPath = resourceMeta.linkPath;
- Path targetPath;
-
- try {
- // this blocks on the download completing
- targetPath = resourceMeta.downloadPath.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
-
- if (createSymlink(targetPath, linkPath)) {
- LOG.info("Localized file: {} as {}", resource, linkPath);
- } else {
- LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+ for (Path linkPath : resourceMeta.linkPaths) {
+ Path targetPath;
+
+ try {
+ // this blocks on the download completing
+ targetPath = resourceMeta.downloadPath.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+
+ if (createSymlink(targetPath, linkPath)) {
+ LOG.info("Localized file: {} as {}", resource, linkPath);
+ } else {
+ LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+ }
}
}
} finally {
@@ -136,8 +144,10 @@ public class TezLocalCacheManager {
*/
public void cleanup() throws IOException {
for (ResourceInfo info : resourceInfo.values()) {
- if (fileContext.util().exists(info.linkPath)) {
- fileContext.delete(info.linkPath, true);
+ for (Path linkPath : info.linkPaths) {
+ if (fileContext.util().exists(linkPath)) {
+ fileContext.delete(linkPath, true);
+ }
}
}
@@ -174,11 +184,11 @@ public class TezLocalCacheManager {
*/
private static class ResourceInfo {
final Future<Path> downloadPath;
- final Path linkPath;
+ final Set<Path> linkPaths = new HashSet<>();
public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
this.downloadPath = downloadPath;
- this.linkPath = linkPath;
+ this.linkPaths.add(linkPath);
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
new file mode 100644
index 0000000..fb23a1d
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestTezLocalCacheManager {
+
+ @Test
+ public void testManager() throws URISyntaxException, IOException {
+ Map<String, LocalResource> resources = new HashMap<>();
+
+ // Test that localization works for regular files and verify that if multiple symlinks are created,
+ // they all work
+ LocalResource resourceOne = createFile("content-one");
+ LocalResource resourceTwo = createFile("content-two");
+
+ resources.put("file-one", resourceOne);
+ resources.put("file-two", resourceTwo);
+ resources.put("file-three", resourceTwo);
+
+ TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
+
+ try {
+ manager.localize();
+
+ Assert.assertEquals(
+ "content-one",
+ new String(Files.readAllBytes(Paths.get("./file-one")))
+ );
+
+ Assert.assertEquals(
+ "content-two",
+ new String(Files.readAllBytes(Paths.get("./file-two")))
+ );
+
+ Assert.assertEquals(
+ "content-two",
+ new String(Files.readAllBytes(Paths.get("./file-three")))
+ );
+ } finally {
+ manager.cleanup();
+ }
+
+ // verify that symlinks were removed
+ Assert.assertFalse(Files.exists(Paths.get("./file-one")));
+ Assert.assertFalse(Files.exists(Paths.get("./file-two")));
+ Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+ }
+
+ // create a temporary file with the given content and return a LocalResource
+ private static LocalResource createFile(String content) throws IOException {
+ FileContext fs = FileContext.getLocalFSFileContext();
+
+ java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
+ File temp = tempFile.toFile();
+ temp.deleteOnExit();
+ Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
+
+ Files.write(tempFile, content.getBytes());
+
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
+ ret.setResource(yarnUrlFromPath);
+ ret.setSize(content.getBytes().length);
+ ret.setType(LocalResourceType.FILE);
+ ret.setVisibility(LocalResourceVisibility.PRIVATE);
+ ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
+ return ret;
+ }
+}