You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/12/13 00:01:47 UTC

[5/8] tez git commit: TEZ-3876. Bug in local mode distributed cache files (Jacob Tolar via jeagles)

TEZ-3876. Bug in local mode distributed cache files (Jacob Tolar via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c378b44
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c378b44
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c378b44

Branch: refs/heads/branch-0.9.1
Commit: 4c378b443b20e1f643e894e81ec41271d0356b3f
Parents: cfede26
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Mon Dec 11 14:40:04 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Mon Dec 11 14:40:04 2017 -0600

----------------------------------------------------------------------
 .../dag/app/launcher/TezLocalCacheManager.java  |  60 ++++++-----
 .../app/launcher/TestTezLocalCacheManager.java  | 107 +++++++++++++++++++
 2 files changed, 142 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
index 80f73aa..45e5540 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -31,7 +31,9 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -90,15 +92,20 @@ public class TezLocalCacheManager {
           throw new IllegalArgumentException("Resource type PATTERN not supported.");
         }
 
-        // submit task to download the object
-        java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
-        Path dest = new Path(downloadDir.toAbsolutePath().toString());
-        FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
-        Future<Path> downloadedPath = threadPool.submit(downloader);
-
         // linkPath is the path we want to symlink the file/directory into
         Path linkPath = new Path(cwd, entry.getKey());
-        resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+
+        if (resourceInfo.containsKey(resource)) {
+            // We've already downloaded this resource and just need to add another link.
+            resourceInfo.get(resource).linkPaths.add(linkPath);
+        } else {
+          // submit task to download the object
+          java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+          Path dest = new Path(downloadDir.toAbsolutePath().toString());
+          FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
+          Future<Path> downloadedPath = threadPool.submit(downloader);
+          resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+        }
       }
 
       // Link each file
@@ -106,20 +113,21 @@ public class TezLocalCacheManager {
         LocalResource resource = entry.getKey();
         ResourceInfo resourceMeta = entry.getValue();
 
-        Path linkPath = resourceMeta.linkPath;
-        Path targetPath;
-
-        try {
-          // this blocks on the download completing
-          targetPath = resourceMeta.downloadPath.get();
-        } catch (InterruptedException | ExecutionException e) {
-          throw new IOException(e);
-        }
-
-        if (createSymlink(targetPath, linkPath)) {
-          LOG.info("Localized file: {} as {}", resource, linkPath);
-        } else {
-          LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+        for (Path linkPath : resourceMeta.linkPaths) {
+          Path targetPath;
+
+          try {
+            // this blocks on the download completing
+            targetPath = resourceMeta.downloadPath.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new IOException(e);
+          }
+
+          if (createSymlink(targetPath, linkPath)) {
+            LOG.info("Localized file: {} as {}", resource, linkPath);
+          } else {
+            LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+          }
         }
       }
     } finally {
@@ -136,8 +144,10 @@ public class TezLocalCacheManager {
    */
   public void cleanup() throws IOException {
     for (ResourceInfo info : resourceInfo.values()) {
-      if (fileContext.util().exists(info.linkPath)) {
-        fileContext.delete(info.linkPath, true);
+      for (Path linkPath : info.linkPaths) {
+        if (fileContext.util().exists(linkPath)) {
+          fileContext.delete(linkPath, true);
+        }
       }
     }
 
@@ -174,11 +184,11 @@ public class TezLocalCacheManager {
    */
   private static class ResourceInfo {
     final Future<Path> downloadPath;
-    final Path linkPath;
+    final Set<Path> linkPaths = new HashSet<>();
 
     public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
       this.downloadPath = downloadPath;
-      this.linkPath = linkPath;
+      this.linkPaths.add(linkPath);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
new file mode 100644
index 0000000..fb23a1d
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestTezLocalCacheManager {
+
+    @Test
+    public void testManager() throws URISyntaxException, IOException {
+        Map<String, LocalResource> resources = new HashMap<>();
+
+        // Test that localization works for regular files and verify that if multiple symlinks are created,
+        // they all work
+        LocalResource resourceOne = createFile("content-one");
+        LocalResource resourceTwo = createFile("content-two");
+
+        resources.put("file-one", resourceOne);
+        resources.put("file-two", resourceTwo);
+        resources.put("file-three", resourceTwo);
+
+        TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
+
+        try {
+            manager.localize();
+
+            Assert.assertEquals(
+                    "content-one",
+                    new String(Files.readAllBytes(Paths.get("./file-one")))
+            );
+
+            Assert.assertEquals(
+                    "content-two",
+                    new String(Files.readAllBytes(Paths.get("./file-two")))
+            );
+
+            Assert.assertEquals(
+                    "content-two",
+                    new String(Files.readAllBytes(Paths.get("./file-three")))
+            );
+        } finally {
+            manager.cleanup();
+        }
+
+        // verify that symlinks were removed
+        Assert.assertFalse(Files.exists(Paths.get("./file-one")));
+        Assert.assertFalse(Files.exists(Paths.get("./file-two")));
+        Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+    }
+
+    // create a temporary file with the given content and return a LocalResource
+    private static LocalResource createFile(String content) throws IOException {
+        FileContext fs = FileContext.getLocalFSFileContext();
+
+        java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
+        File temp = tempFile.toFile();
+        temp.deleteOnExit();
+        Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
+
+        Files.write(tempFile, content.getBytes());
+
+        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+        LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
+        ret.setResource(yarnUrlFromPath);
+        ret.setSize(content.getBytes().length);
+        ret.setType(LocalResourceType.FILE);
+        ret.setVisibility(LocalResourceVisibility.PRIVATE);
+        ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
+        return ret;
+    }
+}