You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:27 UTC
[27/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java
new file mode 100644
index 0000000..aa5edf1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.common.tools;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.persist.Filenames;
+import org.apache.slider.core.persist.InstancePaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import static org.apache.slider.common.SliderXmlConfKeys.CLUSTER_DIRECTORY_PERMISSIONS;
+import static org.apache.slider.common.SliderXmlConfKeys.DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS;
+
+public class CoreFileSystem {
+ private static final Logger
+ log = LoggerFactory.getLogger(CoreFileSystem.class);
+
+ private static final String UTF_8 = "UTF-8";
+
+ protected final FileSystem fileSystem;
+ protected final Configuration configuration;
+
+ public CoreFileSystem(FileSystem fileSystem, Configuration configuration) {
+ Preconditions.checkNotNull(fileSystem,
+ "Cannot create a CoreFileSystem with a null FileSystem");
+ Preconditions.checkNotNull(configuration,
+ "Cannot create a CoreFileSystem with a null Configuration");
+ this.fileSystem = fileSystem;
+ this.configuration = configuration;
+ }
+
+ public CoreFileSystem(Configuration configuration) throws IOException {
+ Preconditions.checkNotNull(configuration,
+ "Cannot create a CoreFileSystem with a null Configuration");
+ this.fileSystem = FileSystem.get(configuration);
+ this.configuration = fileSystem.getConf();
+ }
+
+ /**
+ * Get the temp path for this cluster
+ * @param clustername name of the cluster
+ * @return path for temp files (is not purged)
+ */
+ public Path getTempPathForCluster(String clustername) {
+ Path clusterDir = buildClusterDirPath(clustername);
+ return new Path(clusterDir, SliderKeys.TMP_DIR_PREFIX);
+ }
+
+ /**
+ * Returns the underlying FileSystem for this object.
+ *
+ * @return filesystem
+ */
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("CoreFileSystem{");
+ sb.append("fileSystem=").append(fileSystem.getUri());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Build up the path string for a cluster instance -no attempt to
+ * create the directory is made
+ *
+ * @param clustername name of the cluster
+ * @return the path for persistent data
+ */
+ public Path buildClusterDirPath(String clustername) {
+ Preconditions.checkNotNull(clustername);
+ Path path = getBaseApplicationPath();
+ return new Path(path, SliderKeys.CLUSTER_DIRECTORY + "/" + clustername);
+ }
+
+ /**
+ * Build up the path string for app def folder -no attempt to
+ * create the directory is made
+ *
+ * @param clustername name of the cluster
+ * @return the path for persistent data
+ */
+ public Path buildAppDefDirPath(String clustername) {
+ Path path = buildClusterDirPath(clustername);
+ return new Path(path, SliderKeys.APP_DEF_DIR);
+ }
+
+ /**
+ * Build up the path string for addon folder -no attempt to
+ * create the directory is made
+ *
+ * @param clustername name of the cluster
+ * @return the path for persistent data
+ */
+ public Path buildAddonDirPath(String clustername, String addonId) {
+ Preconditions.checkNotNull(addonId);
+ Path path = buildClusterDirPath(clustername);
+ return new Path(path, SliderKeys.ADDONS_DIR + "/" + addonId);
+ }
+
+ /**
+ * Build up the path string for package install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for persistent app package
+ */
+ public Path buildPackageDirPath(String packageName, String packageVersion) {
+ Preconditions.checkNotNull(packageName);
+ Path path = getBaseApplicationPath();
+ path = new Path(path, SliderKeys.PACKAGE_DIRECTORY + "/" + packageName);
+ if (SliderUtils.isSet(packageVersion)) {
+ path = new Path(path, packageVersion);
+ }
+ return path;
+ }
+
+ /**
+ * Build up the path string for package install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for persistent app package
+ */
+ public Path buildClusterSecurityDirPath(String clusterName) {
+ Preconditions.checkNotNull(clusterName);
+ Path path = buildClusterDirPath(clusterName);
+ return new Path(path, SliderKeys.SECURITY_DIR);
+ }
+
+ /**
+ * Build up the path string for keytab install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for keytab
+ */
+ public Path buildKeytabInstallationDirPath(String keytabFolder) {
+ Preconditions.checkNotNull(keytabFolder);
+ Path path = getBaseApplicationPath();
+ return new Path(path, SliderKeys.KEYTAB_DIR + "/" + keytabFolder);
+ }
+
+ /**
+ * Build up the path string for keytab install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for keytab installation location
+ */
+ public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) {
+ Path homePath = getHomeDirectory();
+ Path baseKeytabDir;
+ if (keytabDir != null) {
+ baseKeytabDir = new Path(homePath, keytabDir);
+ } else {
+ baseKeytabDir = new Path(buildClusterDirPath(clusterName),
+ SliderKeys.KEYTAB_DIR);
+ }
+ return keytabName == null ? baseKeytabDir :
+ new Path(baseKeytabDir, keytabName);
+ }
+
+ /**
+ * Build up the path string for resource install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildResourcePath(String resourceFolder) {
+ Preconditions.checkNotNull(resourceFolder);
+ Path path = getBaseApplicationPath();
+ return new Path(path, SliderKeys.RESOURCE_DIR + "/" + resourceFolder);
+ }
+
+ /**
+ * Build up the path string for resource install location -no attempt to
+ * create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildResourcePath(String dirName, String fileName) {
+ Preconditions.checkNotNull(dirName);
+ Preconditions.checkNotNull(fileName);
+ Path path = getBaseApplicationPath();
+ return new Path(path, SliderKeys.RESOURCE_DIR + "/" + dirName + "/" + fileName);
+ }
+
+ /**
+ * Build up the path string for cluster resource install location -no
+ * attempt to create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildClusterResourcePath(String clusterName, String component) {
+ Preconditions.checkNotNull(clusterName);
+ Path path = buildClusterDirPath(clusterName);
+ return new Path(path, SliderKeys.RESOURCE_DIR + "/" + component);
+ }
+
+ /**
+ * Build up the path string for cluster resource install location -no
+ * attempt to create the directory is made
+ *
+ * @return the path for resource
+ */
+ public Path buildClusterResourcePath(String clusterName) {
+ Preconditions.checkNotNull(clusterName);
+ Path path = buildClusterDirPath(clusterName);
+ return new Path(path, SliderKeys.RESOURCE_DIR);
+ }
+
+ /**
+ * Create the Slider cluster path for a named cluster and all its subdirs
+ * This is a directory; a mkdirs() operation is executed
+ * to ensure that it is there.
+ *
+ * @param clustername name of the cluster
+ * @return the path to the cluster directory
+ * @throws java.io.IOException trouble
+ * @throws SliderException slider-specific exceptions
+ */
+ public Path createClusterDirectories(String clustername, Configuration conf)
+ throws IOException, SliderException {
+
+
+ Path clusterDirectory = buildClusterDirPath(clustername);
+ InstancePaths instancePaths = new InstancePaths(clusterDirectory);
+ createClusterDirectories(instancePaths);
+ return clusterDirectory;
+ }
+
+ /**
+ * Create the Slider cluster path for a named cluster and all its subdirs
+ * This is a directory; a mkdirs() operation is executed
+ * to ensure that it is there.
+ *
+ * @param instancePaths instance paths
+ * @throws IOException trouble
+ * @throws SliderException slider-specific exceptions
+ */
+ public void createClusterDirectories(InstancePaths instancePaths) throws
+ IOException, SliderException {
+ Path instanceDir = instancePaths.instanceDir;
+
+ verifyDirectoryNonexistent(instanceDir);
+ FsPermission clusterPerms = getInstanceDirectoryPermissions();
+ createWithPermissions(instanceDir, clusterPerms);
+ createWithPermissions(instancePaths.snapshotConfPath, clusterPerms);
+ createWithPermissions(instancePaths.generatedConfPath, clusterPerms);
+ createWithPermissions(instancePaths.historyPath, clusterPerms);
+ createWithPermissions(instancePaths.tmpPathAM, clusterPerms);
+
+ // Data Directory
+ String dataOpts =
+ configuration.get(SliderXmlConfKeys.DATA_DIRECTORY_PERMISSIONS,
+ SliderXmlConfKeys.DEFAULT_DATA_DIRECTORY_PERMISSIONS);
+ log.debug("Setting data directory permissions to {}", dataOpts);
+ createWithPermissions(instancePaths.dataPath, new FsPermission(dataOpts));
+
+ }
+
+ /**
+ * Create a directory with the given permissions.
+ *
+ * @param dir directory
+ * @param clusterPerms cluster permissions
+ * @throws IOException IO problem
+ * @throws BadClusterStateException any cluster state problem
+ */
+ public void createWithPermissions(Path dir, FsPermission clusterPerms) throws
+ IOException,
+ BadClusterStateException {
+ if (fileSystem.isFile(dir)) {
+ // HADOOP-9361 shows some filesystems don't correctly fail here
+ throw new BadClusterStateException(
+ "Cannot create a directory over a file %s", dir);
+ }
+ log.debug("mkdir {} with perms {}", dir, clusterPerms);
+ //no mask whatoever
+ fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+ fileSystem.mkdirs(dir, clusterPerms);
+ //and force set it anyway just to make sure
+ fileSystem.setPermission(dir, clusterPerms);
+ }
+
+ /**
+ * Get the permissions of a path
+ *
+ * @param path path to check
+ * @return the permissions
+ * @throws IOException any IO problem (including file not found)
+ */
+ public FsPermission getPathPermissions(Path path) throws IOException {
+ FileStatus status = fileSystem.getFileStatus(path);
+ return status.getPermission();
+ }
+
+ public FsPermission getInstanceDirectoryPermissions() {
+ String clusterDirPermsOct =
+ configuration.get(CLUSTER_DIRECTORY_PERMISSIONS,
+ DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+ return new FsPermission(clusterDirPermsOct);
+ }
+
+ /**
+ * Verify that the cluster directory is not present
+ *
+ * @param clustername name of the cluster
+ * @param clusterDirectory actual directory to look for
+ * @throws IOException trouble with FS
+ * @throws SliderException If the directory exists
+ */
+ public void verifyClusterDirectoryNonexistent(String clustername,
+ Path clusterDirectory)
+ throws IOException, SliderException {
+ if (fileSystem.exists(clusterDirectory)) {
+ throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+ ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername,
+ clusterDirectory);
+ }
+ }
+ /**
+ * Verify that the given directory is not present
+ *
+ * @param clusterDirectory actual directory to look for
+ * @throws IOException trouble with FS
+ * @throws SliderException If the directory exists
+ */
+ public void verifyDirectoryNonexistent(Path clusterDirectory) throws
+ IOException,
+ SliderException {
+ if (fileSystem.exists(clusterDirectory)) {
+
+ log.error("Dir {} exists: {}",
+ clusterDirectory,
+ listFSDir(clusterDirectory));
+ throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+ ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS,
+ clusterDirectory);
+ }
+ }
+
+ /**
+ * Verify that a user has write access to a directory.
+ * It does this by creating then deleting a temp file
+ *
+ * @param dirPath actual directory to look for
+ * @throws FileNotFoundException file not found
+ * @throws IOException trouble with FS
+ * @throws BadClusterStateException if the directory is not writeable
+ */
+ public void verifyDirectoryWriteAccess(Path dirPath) throws IOException,
+ SliderException {
+ verifyPathExists(dirPath);
+ Path tempFile = new Path(dirPath, "tmp-file-for-checks");
+ try {
+ FSDataOutputStream out ;
+ out = fileSystem.create(tempFile, true);
+ IOUtils.closeStream(out);
+ fileSystem.delete(tempFile, false);
+ } catch (IOException e) {
+ log.warn("Failed to create file {}: {}", tempFile, e);
+ throw new BadClusterStateException(e,
+ "Unable to write to directory %s : %s", dirPath, e.toString());
+ }
+ }
+
+ /**
+ * Verify that a path exists
+ * @param path path to check
+ * @throws FileNotFoundException file not found
+ * @throws IOException trouble with FS
+ */
+ public void verifyPathExists(Path path) throws IOException {
+ if (!fileSystem.exists(path)) {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+ /**
+ * Verify that a path exists
+ * @param path path to check
+ * @throws FileNotFoundException file not found or is not a file
+ * @throws IOException trouble with FS
+ */
+ public void verifyFileExists(Path path) throws IOException {
+ FileStatus status = fileSystem.getFileStatus(path);
+
+ if (!status.isFile()) {
+ throw new FileNotFoundException("Not a file: " + path.toString());
+ }
+ }
+
+ /**
+ * Given a path, check if it exists and is a file
+ *
+ * @param path
+ * absolute path to the file to check
+ * @returns true if and only if path exists and is a file, false for all other
+ * reasons including if file check throws IOException
+ */
+ public boolean isFile(Path path) {
+ boolean isFile = false;
+ try {
+ FileStatus status = fileSystem.getFileStatus(path);
+ if (status.isFile()) {
+ isFile = true;
+ }
+ } catch (IOException e) {
+ // ignore, isFile is already set to false
+ }
+ return isFile;
+ }
+
+ /**
+ * Verify that a file exists in the zip file given by path
+ * @param path path to zip file
+ * @param file file expected to be in zip
+ * @throws FileNotFoundException file not found or is not a zip file
+ * @throws IOException trouble with FS
+ */
+ public void verifyFileExistsInZip(Path path, String file) throws IOException {
+ fileSystem.copyToLocalFile(path, new Path("/tmp"));
+ File dst = new File((new Path("/tmp", path.getName())).toString());
+ Enumeration<? extends ZipEntry> entries;
+ ZipFile zipFile = new ZipFile(dst);
+ boolean found = false;
+
+ try {
+ entries = zipFile.entries();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ String nm = entry.getName();
+ if (nm.endsWith(file)) {
+ found = true;
+ break;
+ }
+ }
+ } finally {
+ zipFile.close();
+ }
+ dst.delete();
+ if (!found) throw new FileNotFoundException("file: " + file + " not found in " + path);
+ log.info("Verification of " + path + " passed");
+ }
+ /**
+ * Create the application-instance specific temporary directory
+ * in the DFS
+ *
+ * @param clustername name of the cluster
+ * @param subdir application ID
+ * @return the path; this directory will already have been created
+ */
+ public Path createAppInstanceTempPath(String clustername, String subdir)
+ throws IOException {
+ Path tmp = getTempPathForCluster(clustername);
+ Path instancePath = new Path(tmp, subdir);
+ fileSystem.mkdirs(instancePath);
+ return instancePath;
+ }
+
+ /**
+ * Create the application-instance specific temporary directory
+ * in the DFS
+ *
+ * @param clustername name of the cluster
+ * @return the path; this directory will already have been deleted
+ */
+ public Path purgeAppInstanceTempFiles(String clustername) throws
+ IOException {
+ Path tmp = getTempPathForCluster(clustername);
+ fileSystem.delete(tmp, true);
+ return tmp;
+ }
+
+ /**
+ * Get the base path
+ *
+ * @return the base path optionally configured by
+ * {@link SliderXmlConfKeys#KEY_SLIDER_BASE_PATH}
+ */
+ public Path getBaseApplicationPath() {
+ String configuredBasePath = configuration.get(SliderXmlConfKeys.KEY_SLIDER_BASE_PATH);
+ return configuredBasePath != null ? new Path(configuredBasePath) :
+ new Path(getHomeDirectory(), SliderKeys.SLIDER_BASE_DIRECTORY);
+ }
+
+ /**
+ * Get slider dependency parent dir in HDFS
+ *
+ * @return the parent dir path of slider.tar.gz in HDFS
+ */
+ public Path getDependencyPath() {
+ String parentDir = (SliderUtils.isHdp()) ? SliderKeys.SLIDER_DEPENDENCY_HDP_PARENT_DIR
+ + SliderKeys.SLIDER_DEPENDENCY_DIR
+ : SliderKeys.SLIDER_DEPENDENCY_DIR;
+ Path dependencyPath = new Path(String.format(parentDir,
+ SliderUtils.getSliderVersion()));
+ return dependencyPath;
+ }
+
+ /**
+ * Get slider.tar.gz absolute filepath in HDFS
+ *
+ * @return the absolute path to slider.tar.gz in HDFS
+ */
+ public Path getDependencyTarGzip() {
+ Path dependencyLibAmPath = getDependencyPath();
+ Path dependencyLibTarGzip = new Path(
+ dependencyLibAmPath.toUri().toString(),
+ SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME
+ + SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT);
+ return dependencyLibTarGzip;
+ }
+
+ public Path getHomeDirectory() {
+ return fileSystem.getHomeDirectory();
+ }
+
+ public boolean maybeAddImagePath(Map<String, LocalResource> localResources,
+ Path imagePath) throws IOException {
+ if (imagePath != null) {
+ LocalResource resource = createAmResource(imagePath,
+ LocalResourceType.ARCHIVE);
+ localResources.put(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR, resource);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean maybeAddImagePath(Map<String, LocalResource> localResources,
+ String imagePath) throws IOException {
+
+ return imagePath != null &&
+ maybeAddImagePath(localResources, new Path(imagePath));
+ }
+
+
+
+
+ /**
+ * Create an AM resource from the
+ *
+ * @param destPath dest path in filesystem
+ * @param resourceType resource type
+ * @return the resource set up wih application-level visibility and the
+ * timestamp & size set from the file stats.
+ */
+ public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException {
+ FileStatus destStatus = fileSystem.getFileStatus(destPath);
+ LocalResource amResource = Records.newRecord(LocalResource.class);
+ amResource.setType(resourceType);
+ // Set visibility of the resource
+ // Setting to most private option
+ amResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ // Set the resource to be copied over
+ amResource.setResource(ConverterUtils.getYarnUrlFromPath(fileSystem
+ .resolvePath(destStatus.getPath())));
+ // Set timestamp and length of file so that the framework
+ // can do basic sanity checks for the local resource
+ // after it has been copied over to ensure it is the same
+ // resource the client intended to use with the application
+ amResource.setTimestamp(destStatus.getModificationTime());
+ amResource.setSize(destStatus.getLen());
+ return amResource;
+ }
+
+ /**
+ * Register all files under a fs path as a directory to push out
+ *
+ * @param srcDir src dir
+ * @param destRelativeDir dest dir (no trailing /)
+ * @return the map of entries
+ */
+ public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException {
+ //now register each of the files in the directory to be
+ //copied to the destination
+ FileStatus[] fileset = fileSystem.listStatus(srcDir);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>(fileset.length);
+ for (FileStatus entry : fileset) {
+
+ LocalResource resource = createAmResource(entry.getPath(),
+ LocalResourceType.FILE);
+ String relativePath = destRelativeDir + "/" + entry.getPath().getName();
+ localResources.put(relativePath, resource);
+ }
+ return localResources;
+ }
+
+ /**
+ * Submit a JAR containing a specific class, returning
+ * the resource to be mapped in
+ *
+ * @param clazz class to look for
+ * @param subdir subdirectory (expected to end in a "/")
+ * @param jarName <i>At the destination</i>
+ * @return the local resource ref
+ * @throws IOException trouble copying to HDFS
+ */
+ public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName)
+ throws IOException, SliderException {
+ File localFile = SliderUtils.findContainingJarOrFail(clazz);
+ return submitFile(localFile, tempPath, subdir, jarName);
+ }
+
+ /**
+ * Submit a local file to the filesystem references by the instance's cluster
+ * filesystem
+ *
+ * @param localFile filename
+ * @param subdir subdirectory (expected to end in a "/")
+ * @param destFileName destination filename
+ * @return the local resource ref
+ * @throws IOException trouble copying to HDFS
+ */
+ public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName)
+ throws IOException {
+ Path src = new Path(localFile.toString());
+ Path subdirPath = new Path(tempPath, subdir);
+ fileSystem.mkdirs(subdirPath);
+ Path destPath = new Path(subdirPath, destFileName);
+ log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath);
+
+ fileSystem.copyFromLocalFile(false, true, src, destPath);
+
+ // Set the type of resource - file or archive
+ // archives are untarred at destination
+ // we don't need the jar file to be untarred for now
+ return createAmResource(destPath, LocalResourceType.FILE);
+ }
+
+ /**
+ * Submit the AM tar.gz resource referenced by the instance's cluster
+ * filesystem. Also, update the providerResources object with the new
+ * resource.
+ *
+ * @param providerResources
+ * the provider resource map to be updated
+ * @throws IOException
+ * trouble copying to HDFS
+ */
+ public void submitTarGzipAndUpdate(
+ Map<String, LocalResource> providerResources) throws IOException,
+ BadClusterStateException {
+ Path dependencyLibTarGzip = getDependencyTarGzip();
+ LocalResource lc = createAmResource(dependencyLibTarGzip,
+ LocalResourceType.ARCHIVE);
+ providerResources.put(SliderKeys.SLIDER_DEPENDENCY_LOCALIZED_DIR_LINK, lc);
+ }
+
+ /**
+ * Copy local file(s) to destination HDFS directory. If {@code localPath} is a
+ * local directory then all files matching the {@code filenameFilter}
+ * (optional) are copied, otherwise {@code filenameFilter} is ignored.
+ *
+ * @param localPath
+ * a local file or directory path
+ * @param filenameFilter
+ * if {@code localPath} is a directory then filenameFilter is used as
+ * a filter (if specified)
+ * @param destDir
+ * the destination HDFS directory where the file(s) should be copied
+ * @param fp
+ * file permissions of all the directories and files that will be
+ * created in this api
+ * @throws IOException
+ */
+ public void copyLocalFilesToHdfs(File localPath,
+ FilenameFilter filenameFilter, Path destDir, FsPermission fp)
+ throws IOException {
+ if (localPath == null || destDir == null) {
+ throw new IOException("Either localPath or destDir is null");
+ }
+ fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+ "000");
+ fileSystem.mkdirs(destDir, fp);
+ if (localPath.isDirectory()) {
+ // copy all local files under localPath to destDir (honoring filename
+ // filter if provided
+ File[] localFiles = localPath.listFiles(filenameFilter);
+ Path[] localFilePaths = new Path[localFiles.length];
+ int i = 0;
+ for (File localFile : localFiles) {
+ localFilePaths[i++] = new Path(localFile.getPath());
+ }
+ log.info("Copying {} files from {} to {}", i, localPath.toURI(),
+ destDir.toUri());
+ fileSystem.copyFromLocalFile(false, true, localFilePaths, destDir);
+ } else {
+ log.info("Copying file {} to {}", localPath.toURI(), destDir.toUri());
+ fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
+ destDir);
+ }
+ // set permissions for all the files created in the destDir
+ fileSystem.setPermission(destDir, fp);
+ }
+
+ public void copyLocalFileToHdfs(File localPath,
+ Path destPath, FsPermission fp)
+ throws IOException {
+ if (localPath == null || destPath == null) {
+ throw new IOException("Either localPath or destPath is null");
+ }
+ fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+ "000");
+ fileSystem.mkdirs(destPath.getParent(), fp);
+ log.info("Copying file {} to {}", localPath.toURI(), destPath.toUri());
+
+ fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
+ destPath);
+ // set file permissions of the destPath
+ fileSystem.setPermission(destPath, fp);
+ }
+
+ public void copyHdfsFileToLocal(Path hdfsPath, File destFile)
+ throws IOException {
+ if (hdfsPath == null || destFile == null) {
+ throw new IOException("Either hdfsPath or destPath is null");
+ }
+ log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI());
+
+ Path destPath = new Path(destFile.getPath());
+ fileSystem.copyToLocalFile(hdfsPath, destPath);
+ }
+
+ /**
+ * list entries in a filesystem directory
+ *
+ * @param path directory
+ * @return a listing, one to a line
+ * @throws IOException
+ */
+ public String listFSDir(Path path) throws IOException {
+ FileStatus[] stats = fileSystem.listStatus(path);
+ StringBuilder builder = new StringBuilder();
+ for (FileStatus stat : stats) {
+ builder.append(stat.getPath().toString())
+ .append("\t")
+ .append(stat.getLen())
+ .append("\n");
+ }
+ return builder.toString();
+ }
+
+ /**
+ * List all application instances persisted for this user, giving the
+ * path. The instance name is the last element in the path
+ * @return a possibly empty map of application instance names to paths
+ */
+ public Map<String, Path> listPersistentInstances() throws IOException {
+ FileSystem fs = getFileSystem();
+ Path path = new Path(getBaseApplicationPath(), SliderKeys.CLUSTER_DIRECTORY);
+ log.debug("Looking for all persisted application at {}", path.toString());
+ if (!fs.exists(path)) {
+ // special case: no instances have ever been created
+ return new HashMap<String, Path>(0);
+ }
+ FileStatus[] statuses = fs.listStatus(path);
+ Map<String, Path> instances = new HashMap<String, Path>(statuses.length);
+
+ // enum the child entries
+ for (FileStatus status : statuses) {
+ if (status.isDirectory()) {
+ // for directories, look for an internal.json underneath
+ Path child = status.getPath();
+ Path internalJson = new Path(child, Filenames.INTERNAL);
+ if (fs.exists(internalJson)) {
+ // success => this is an instance
+ instances.put(child.getName(), child);
+ } else {
+ log.info("Malformed cluster found at {}. It does not appear to be a valid persisted instance.",
+ child.toString());
+ }
+ }
+ }
+ return instances;
+ }
+
+ public void touch(Path path, boolean overwrite) throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fileSystem.create(path, overwrite);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
+ public void cat(Path path, boolean overwrite, String data) throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fileSystem.create(path, overwrite);
+ byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
+ out.write(bytes);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
+ public String cat(Path path) throws IOException {
+ FileStatus status = fileSystem.getFileStatus(path);
+ byte[] b = new byte[(int) status.getLen()];
+ FSDataInputStream in = null;
+ try {
+ in = fileSystem.open(path);
+ int count = in.read(b);
+ return new String(b, 0, count, UTF_8);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Create a path that must exist in the cluster fs
+ * @param uri uri to create
+ * @return the path
+ * @throws SliderException if the path does not exist
+ */
+ public Path createPathThatMustExist(String uri) throws
+ SliderException, IOException {
+ Preconditions.checkNotNull(uri);
+ Path path = new Path(uri);
+ verifyPathExists(path);
+ return path;
+ }
+
+ /**
+ * Locate an application conf json in the FS. This includes a check to verify
+ * that the file is there.
+ *
+ * @param clustername name of the cluster
+ * @return the path to the spec.
+ * @throws IOException IO problems
+ * @throws SliderException if the path isn't there
+ */
+ public Path locateInstanceDefinition(String clustername) throws IOException,
+ SliderException {
+ Path clusterDirectory = buildClusterDirPath(clustername);
+ Path appConfPath =
+ new Path(clusterDirectory, Filenames.APPCONF);
+ verifyClusterSpecExists(clustername, appConfPath);
+ return appConfPath;
+ }
+
+ /**
+ * Verify that a cluster specification exists
+ * @param clustername name of the cluster (For errors only)
+ * @param clusterSpecPath cluster specification path
+ * @throws IOException IO problems
+ * @throws SliderException if the cluster specification is not present
+ */
+ public void verifyClusterSpecExists(String clustername, Path clusterSpecPath)
+ throws IOException,
+ SliderException {
+ if (!fileSystem.isFile(clusterSpecPath)) {
+ log.debug("Missing specification file {}", clusterSpecPath);
+ throw UnknownApplicationInstanceException.unknownInstance(
+ clustername + "\n (definition not found at " + clusterSpecPath);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java
new file mode 100644
index 0000000..e5fa424
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.common.tools;
+
+import java.io.Closeable;
+
+/**
+ * A duration in milliseconds. This class can be used
+ * to count time, and to be polled to see if a time limit has
+ * passed.
+ */
+public class Duration implements Closeable {
+ public long start, finish;
+ public final long limit;
+
+ /**
+ * Create a duration instance with a limit of 0
+ */
+ public Duration() {
+ this(0);
+ }
+
+ /**
+ * Create a duration with a limit specified in millis
+ * @param limit duration in milliseconds
+ */
+ public Duration(long limit) {
+ this.limit = limit;
+ }
+
+ /**
+ * Start
+ * @return self
+ */
+ public Duration start() {
+ start = now();
+ return this;
+ }
+
+ /**
+ * The close operation relays to {@link #finish()}.
+ * Implementing it allows Duration instances to be automatically
+ * finish()'d in Java7 try blocks for when used in measuring durations.
+ */
+ @Override
+ public final void close() {
+ finish();
+ }
+
+ public void finish() {
+ finish = now();
+ }
+
+ protected long now() {
+ return System.nanoTime()/1000000;
+ }
+
+ public long getInterval() {
+ return finish - start;
+ }
+
+ /**
+ * return true if the limit has been exceeded
+ * @return true if a limit was set and the current time
+ * exceeds it.
+ */
+ public boolean getLimitExceeded() {
+ return limit >= 0 && ((now() - start) > limit);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Duration");
+ if (finish >= start) {
+ builder.append(" finished at ").append(getInterval()).append(" millis;");
+ } else {
+ if (start > 0) {
+ builder.append(" started but not yet finished;");
+ } else {
+ builder.append(" unstarted;");
+ }
+ }
+ if (limit > 0) {
+ builder.append(" limit: ").append(limit).append(" millis");
+ if (getLimitExceeded()) {
+ builder.append(" - exceeded");
+ }
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
new file mode 100644
index 0000000..64783b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.common.tools;
+
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * a scanner which can take an input string for a range or scan the lot.
+ */
+public class PortScanner {
+ private static Pattern NUMBER_RANGE = Pattern.compile("^(\\d+)\\s*-\\s*(\\d+)$");
+ private static Pattern SINGLE_NUMBER = Pattern.compile("^\\d+$");
+
+ private List<Integer> remainingPortsToCheck;
+
+ public PortScanner() {
+ }
+
+ public void setPortRange(String input) throws BadConfigException {
+ // first split based on commas
+ Set<Integer> inputPorts= new TreeSet<Integer>();
+ String[] ranges = input.split(",");
+ for ( String range : ranges ) {
+ if (range.trim().isEmpty()) {
+ continue;
+ }
+ Matcher m = SINGLE_NUMBER.matcher(range.trim());
+ if (m.find()) {
+ inputPorts.add(Integer.parseInt(m.group()));
+ continue;
+ }
+ m = NUMBER_RANGE.matcher(range.trim());
+ if (m.find()) {
+ String[] boundaryValues = m.group(0).split("-");
+ int start = Integer.parseInt(boundaryValues[0].trim());
+ int end = Integer.parseInt(boundaryValues[1].trim());
+ if (end < start) {
+ throw new BadConfigException("End of port range is before start: "
+ + range + " in input: " + input);
+ }
+ for (int i = start; i < end + 1; i++) {
+ inputPorts.add(i);
+ }
+ continue;
+ }
+ throw new BadConfigException("Bad port range: " + range + " in input: "
+ + input);
+ }
+ if (inputPorts.size() == 0) {
+ throw new BadConfigException("No ports found in range: " + input);
+ }
+ this.remainingPortsToCheck = new ArrayList<Integer>(inputPorts);
+ }
+
+ public List<Integer> getRemainingPortsToCheck() {
+ return remainingPortsToCheck;
+ }
+
+ public int getAvailablePort() throws SliderException, IOException {
+ if (remainingPortsToCheck != null) {
+ return getAvailablePortViaPortArray();
+ } else {
+ return SliderUtils.getOpenPort();
+ }
+ }
+
+ private int getAvailablePortViaPortArray() throws SliderException {
+ boolean found = false;
+ int availablePort = -1;
+ Iterator<Integer> portsToCheck = this.remainingPortsToCheck.iterator();
+ while (portsToCheck.hasNext() && !found) {
+ int portToCheck = portsToCheck.next();
+ found = SliderUtils.isPortAvailable(portToCheck);
+ if (found) {
+ availablePort = portToCheck;
+ portsToCheck.remove();
+ }
+ }
+
+ if (availablePort < 0) {
+ throw new SliderException(SliderExitCodes.EXIT_BAD_CONFIGURATION,
+ "No available ports found in configured range {}",
+ remainingPortsToCheck);
+ }
+
+ return availablePort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java
new file mode 100644
index 0000000..294f37e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.common.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/**
+ * Extends Core Filesystem with operations to manipulate ClusterDescription
+ * persistent state
+ */
+public class SliderFileSystem extends CoreFileSystem {
+
+ public SliderFileSystem(FileSystem fileSystem,
+ Configuration configuration) {
+ super(fileSystem, configuration);
+ }
+
+ public SliderFileSystem(Configuration configuration) throws IOException {
+ super(configuration);
+ }
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org