You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/10/04 21:43:50 UTC

tez git commit: TEZ-3848. Tez Local mode doesn't localize distributed cache files (Jacob Tolar via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 3b2933f01 -> b875eb1f2


TEZ-3848. Tez Local mode doesn't localize distributed cache files (Jacob Tolar via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b875eb1f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b875eb1f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b875eb1f

Branch: refs/heads/master
Commit: b875eb1f2e0a56d4b9c01127b40d81983e9be896
Parents: 3b2933f
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 4 16:43:32 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 4 16:43:32 2017 -0500

----------------------------------------------------------------------
 .../app/launcher/LocalContainerLauncher.java    |  42 ++++-
 .../dag/app/launcher/TezLocalCacheManager.java  | 184 +++++++++++++++++++
 2 files changed, 223 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b875eb1f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index d50b49e..9764daa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.app.launcher;
 
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -48,7 +47,6 @@ import org.apache.tez.common.DagContainerLauncher;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -100,6 +98,9 @@ public class LocalContainerLauncher extends DagContainerLauncher {
       runningContainers =
       new ConcurrentHashMap<ContainerId, RunningTaskCallback>();
 
+  private final ConcurrentHashMap<ContainerId, TezLocalCacheManager>
+          cacheManagers = new ConcurrentHashMap<>();
+
   private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
       new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
 
@@ -229,6 +230,10 @@ public class LocalContainerLauncher extends DagContainerLauncher {
 
   private void handleLaunchFailed(Throwable t, ContainerId containerId) {
     String message;
+
+    // clean up distributed cache files
+    cleanupCacheFiles(containerId);
+
     if (t instanceof RejectedExecutionException) {
       message = "Failed to queue container launch for container Id: " + containerId;
     } else {
@@ -244,10 +249,22 @@ public class LocalContainerLauncher extends DagContainerLauncher {
     String tokenIdentifier = context.getApplicationID().toString();
     try {
       TezChild tezChild;
+
       try {
         int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());
+
+        Configuration conf = context.getAMConf();
+        if (isLocalMode) {
+          TezLocalCacheManager cacheManager = new TezLocalCacheManager(
+              event.getContainerLaunchContext().getLocalResources(),
+              conf
+          );
+          cacheManagers.put(event.getContainerId(), cacheManager);
+          cacheManager.localize();
+        }
+
         tezChild =
-            createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
+            createTezChild(conf, event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
                 ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(),
                 TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
@@ -322,6 +339,9 @@ public class LocalContainerLauncher extends DagContainerLauncher {
                 (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
                 result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
       }
+
+      // clean up distributed cache files
+      cleanupCacheFiles(containerId);
     }
 
     @Override
@@ -341,6 +361,22 @@ public class LocalContainerLauncher extends DagContainerLauncher {
             TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
             "CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED);
       }
+
+      // clean up distributed cache files
+      cleanupCacheFiles(containerId);
+    }
+  }
+
+  private void cleanupCacheFiles(ContainerId container) {
+    if (isLocalMode) {
+      TezLocalCacheManager manager = cacheManagers.remove(container);
+      try {
+        if (manager != null) {
+          manager.cleanup();
+        }
+      } catch (IOException e) {
+        LOG.info("Unable to clean up local cache files: ", e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b875eb1f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
new file mode 100644
index 0000000..80f73aa
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for localizing files from the distributed cache for Tez local mode.
+ */
+public class TezLocalCacheManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TezLocalCacheManager.class);
+
+  final private Map<String, LocalResource> resources;
+  final private Configuration conf;
+  final private UserGroupInformation ugi;
+  final private FileContext fileContext;
+  final private java.nio.file.Path tempDir;
+
+  final private Map<LocalResource, ResourceInfo> resourceInfo = new HashMap<>();
+
+  public TezLocalCacheManager(Map<String, LocalResource> resources, Configuration conf) throws IOException {
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.fileContext = FileContext.getLocalFSFileContext();
+    this.resources = resources;
+    this.conf = conf;
+    this.tempDir = Files.createTempDirectory(Paths.get("."), "tez-local-cache");
+  }
+
+  /**
+   * Localize this instance's resources by downloading and symlinking them.
+   *
+   * @throws IOException when an error occurs in download or link
+   */
+  public void localize() throws IOException {
+    String absPath = Paths.get(".").toAbsolutePath().normalize().toString();
+    Path cwd = fileContext.makeQualified(new Path(absPath));
+    ExecutorService threadPool = null;
+
+    try {
+      // construct new threads with helpful names
+      ThreadFactory threadFactory = new ThreadFactoryBuilder()
+          .setNameFormat("TezLocalCacheManager Downloader #%d")
+          .build();
+      threadPool = Executors.newCachedThreadPool(threadFactory);
+
+      // start all fetches
+      for (Map.Entry<String, LocalResource> entry : resources.entrySet()) {
+        String resourceName = entry.getKey();
+        LocalResource resource = entry.getValue();
+
+        if (resource.getType() == LocalResourceType.PATTERN) {
+          throw new IllegalArgumentException("Resource type PATTERN not supported.");
+        }
+
+        // submit task to download the object
+        java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+        Path dest = new Path(downloadDir.toAbsolutePath().toString());
+        FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
+        Future<Path> downloadedPath = threadPool.submit(downloader);
+
+        // linkPath is the path we want to symlink the file/directory into
+        Path linkPath = new Path(cwd, entry.getKey());
+        resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+      }
+
+      // Link each file
+      for (Map.Entry<LocalResource, ResourceInfo> entry : resourceInfo.entrySet()) {
+        LocalResource resource = entry.getKey();
+        ResourceInfo resourceMeta = entry.getValue();
+
+        Path linkPath = resourceMeta.linkPath;
+        Path targetPath;
+
+        try {
+          // this blocks on the download completing
+          targetPath = resourceMeta.downloadPath.get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IOException(e);
+        }
+
+        if (createSymlink(targetPath, linkPath)) {
+          LOG.info("Localized file: {} as {}", resource, linkPath);
+        } else {
+          LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+        }
+      }
+    } finally {
+      if (threadPool != null) {
+        threadPool.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Clean up any symlinks and temp files that were created.
+   *
+   * @throws IOException when an error occurs in cleanup
+   */
+  public void cleanup() throws IOException {
+    for (ResourceInfo info : resourceInfo.values()) {
+      if (fileContext.util().exists(info.linkPath)) {
+        fileContext.delete(info.linkPath, true);
+      }
+    }
+
+    Path temp = new Path(tempDir.toString());
+    if (fileContext.util().exists(temp)) {
+      fileContext.delete(temp, true);
+    }
+  }
+
+  /**
+   * Create a symlink.
+   */
+  private boolean createSymlink(Path target, Path link) throws IOException {
+    LOG.info("Creating symlink: {} <- {}", target, link);
+    String targetPath = target.toUri().getPath();
+    String linkPath = link.toUri().getPath();
+
+    if (fileContext.util().exists(link)) {
+      LOG.warn("File already exists at symlink path: {}", link);
+      return false;
+    } else {
+      try {
+        Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath));
+        return true;
+      } catch (UnsupportedOperationException e) {
+        LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Wrapper to keep track of download path and link path
+   */
+  private static class ResourceInfo {
+    final Future<Path> downloadPath;
+    final Path linkPath;
+
+    public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
+      this.downloadPath = downloadPath;
+      this.linkPath = linkPath;
+    }
+  }
+}