You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/10/04 21:43:50 UTC
tez git commit: TEZ-3848. Tez Local mode doesn't localize distributed
cache files (Jacob Tolar via jeagles)
Repository: tez
Updated Branches:
refs/heads/master 3b2933f01 -> b875eb1f2
TEZ-3848. Tez Local mode doesn't localize distributed cache files (Jacob Tolar via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b875eb1f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b875eb1f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b875eb1f
Branch: refs/heads/master
Commit: b875eb1f2e0a56d4b9c01127b40d81983e9be896
Parents: 3b2933f
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 4 16:43:32 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 4 16:43:32 2017 -0500
----------------------------------------------------------------------
.../app/launcher/LocalContainerLauncher.java | 42 ++++-
.../dag/app/launcher/TezLocalCacheManager.java | 184 +++++++++++++++++++
2 files changed, 223 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b875eb1f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index d50b49e..9764daa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -18,7 +18,6 @@
package org.apache.tez.dag.app.launcher;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -48,7 +47,6 @@ import org.apache.tez.common.DagContainerLauncher;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -100,6 +98,9 @@ public class LocalContainerLauncher extends DagContainerLauncher {
runningContainers =
new ConcurrentHashMap<ContainerId, RunningTaskCallback>();
+ private final ConcurrentHashMap<ContainerId, TezLocalCacheManager>
+ cacheManagers = new ConcurrentHashMap<>();
+
private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
@@ -229,6 +230,10 @@ public class LocalContainerLauncher extends DagContainerLauncher {
private void handleLaunchFailed(Throwable t, ContainerId containerId) {
String message;
+
+ // clean up distributed cache files
+ cleanupCacheFiles(containerId);
+
if (t instanceof RejectedExecutionException) {
message = "Failed to queue container launch for container Id: " + containerId;
} else {
@@ -244,10 +249,22 @@ public class LocalContainerLauncher extends DagContainerLauncher {
String tokenIdentifier = context.getApplicationID().toString();
try {
TezChild tezChild;
+
try {
int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());
+
+ Configuration conf = context.getAMConf();
+ if (isLocalMode) {
+ TezLocalCacheManager cacheManager = new TezLocalCacheManager(
+ event.getContainerLaunchContext().getLocalResources(),
+ conf
+ );
+ cacheManagers.put(event.getContainerId(), cacheManager);
+ cacheManager.localize();
+ }
+
tezChild =
- createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
+ createTezChild(conf, event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(),
TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
@@ -322,6 +339,9 @@ public class LocalContainerLauncher extends DagContainerLauncher {
(result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
}
+
+ // clean up distributed cache files
+ cleanupCacheFiles(containerId);
}
@Override
@@ -341,6 +361,22 @@ public class LocalContainerLauncher extends DagContainerLauncher {
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
"CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED);
}
+
+ // clean up distributed cache files
+ cleanupCacheFiles(containerId);
+ }
+ }
+
+ private void cleanupCacheFiles(ContainerId container) {
+ if (isLocalMode) {
+ TezLocalCacheManager manager = cacheManagers.remove(container);
+ try {
+ if (manager != null) {
+ manager.cleanup();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to clean up local cache files: ", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b875eb1f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
new file mode 100644
index 0000000..80f73aa
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for localizing files from the distributed cache for Tez local mode.
+ */
+public class TezLocalCacheManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezLocalCacheManager.class);
+
+ final private Map<String, LocalResource> resources;
+ final private Configuration conf;
+ final private UserGroupInformation ugi;
+ final private FileContext fileContext;
+ final private java.nio.file.Path tempDir;
+
+ final private Map<LocalResource, ResourceInfo> resourceInfo = new HashMap<>();
+
+ public TezLocalCacheManager(Map<String, LocalResource> resources, Configuration conf) throws IOException {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ this.fileContext = FileContext.getLocalFSFileContext();
+ this.resources = resources;
+ this.conf = conf;
+ this.tempDir = Files.createTempDirectory(Paths.get("."), "tez-local-cache");
+ }
+
+ /**
+ * Localize this instance's resources by downloading and symlinking them.
+ *
+ * @throws IOException when an error occurs in download or link
+ */
+ public void localize() throws IOException {
+ String absPath = Paths.get(".").toAbsolutePath().normalize().toString();
+ Path cwd = fileContext.makeQualified(new Path(absPath));
+ ExecutorService threadPool = null;
+
+ try {
+ // construct new threads with helpful names
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("TezLocalCacheManager Downloader #%d")
+ .build();
+ threadPool = Executors.newCachedThreadPool(threadFactory);
+
+ // start all fetches
+ for (Map.Entry<String, LocalResource> entry : resources.entrySet()) {
+ String resourceName = entry.getKey();
+ LocalResource resource = entry.getValue();
+
+ if (resource.getType() == LocalResourceType.PATTERN) {
+ throw new IllegalArgumentException("Resource type PATTERN not supported.");
+ }
+
+ // submit task to download the object
+ java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+ Path dest = new Path(downloadDir.toAbsolutePath().toString());
+ FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
+ Future<Path> downloadedPath = threadPool.submit(downloader);
+
+ // linkPath is the path we want to symlink the file/directory into
+ Path linkPath = new Path(cwd, entry.getKey());
+ resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
+ }
+
+ // Link each file
+ for (Map.Entry<LocalResource, ResourceInfo> entry : resourceInfo.entrySet()) {
+ LocalResource resource = entry.getKey();
+ ResourceInfo resourceMeta = entry.getValue();
+
+ Path linkPath = resourceMeta.linkPath;
+ Path targetPath;
+
+ try {
+ // this blocks on the download completing
+ targetPath = resourceMeta.downloadPath.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+
+ if (createSymlink(targetPath, linkPath)) {
+ LOG.info("Localized file: {} as {}", resource, linkPath);
+ } else {
+ LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
+ }
+ }
+ } finally {
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Clean up any symlinks and temp files that were created.
+ *
+ * @throws IOException when an error occurs in cleanup
+ */
+ public void cleanup() throws IOException {
+ for (ResourceInfo info : resourceInfo.values()) {
+ if (fileContext.util().exists(info.linkPath)) {
+ fileContext.delete(info.linkPath, true);
+ }
+ }
+
+ Path temp = new Path(tempDir.toString());
+ if (fileContext.util().exists(temp)) {
+ fileContext.delete(temp, true);
+ }
+ }
+
+ /**
+ * Create a symlink.
+ */
+ private boolean createSymlink(Path target, Path link) throws IOException {
+ LOG.info("Creating symlink: {} <- {}", target, link);
+ String targetPath = target.toUri().getPath();
+ String linkPath = link.toUri().getPath();
+
+ if (fileContext.util().exists(link)) {
+ LOG.warn("File already exists at symlink path: {}", link);
+ return false;
+ } else {
+ try {
+ Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath));
+ return true;
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Wrapper to keep track of download path and link path
+ */
+ private static class ResourceInfo {
+ final Future<Path> downloadPath;
+ final Path linkPath;
+
+ public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
+ this.downloadPath = downloadPath;
+ this.linkPath = linkPath;
+ }
+ }
+}