You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/18 03:51:25 UTC

git commit: Merge pull request #137 from tgravescs/sparkYarnJarsHdfsRebase

Updated Branches:
  refs/heads/branch-0.8 af98fbc93 -> e134ed5d6


Merge pull request #137 from tgravescs/sparkYarnJarsHdfsRebase

Allow spark on yarn to be run from HDFS.

Allows the spark.jar, app.jar, and log4j.properties to be put into hdfs.  Allows you to specify the files on a different hdfs cluster and it will copy them over. It makes sure permissions are correct and makes sure to put things into public distributed cache so they can be reused amongst users if their permissions are appropriate.  Also add a bit of error handling for missing arguments.

(cherry picked from commit f49ea28d25728e19e56b140a2f374631c94153bc)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e134ed5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e134ed5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e134ed5d

Branch: refs/heads/branch-0.8
Commit: e134ed5d670b82a183c060ad148676d2d5dd31b9
Parents: af98fbc
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Tue Nov 12 19:13:39 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 17 18:51:19 2013 -0800

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |   1 +
 pom.xml                                         |   6 +
 project/SparkBuild.scala                        |   3 +-
 yarn/pom.xml                                    |  50 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |   2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 276 ++++++++++---------
 .../yarn/ClientDistributedCacheManager.scala    | 228 +++++++++++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      |  42 +--
 .../ClientDistributedCacheManagerSuite.scala    | 220 +++++++++++++++
 9 files changed, 655 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 2898af0..6fd1d0d 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -21,6 +21,7 @@ The assembled JAR will be something like this:
 # Preparations
 
 - Building a YARN-enabled assembly (see above).
+- The assembled jar can be installed into HDFS or used locally.
 - Your application code must be packaged into a separate JAR file.
 
 If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt assembly`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0ebcd2c..07213f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -356,6 +356,12 @@
         <version>3.1</version>
         <scope>test</scope>
       </dependency>
+     <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>1.8.5</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.scalacheck</groupId>
         <artifactId>scalacheck_2.9.3</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4f07851..95a9ca9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -162,7 +162,8 @@ object SparkBuild extends Build {
       "org.scalatest" %% "scalatest" % "1.9.1" % "test",
       "org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
       "com.novocode" % "junit-interface" % "0.9" % "test",
-      "org.easymock" % "easymock" % "3.1" % "test"
+      "org.easymock" % "easymock" % "3.1" % "test",
+      "org.mockito" % "mockito-all" % "1.8.5" % "test"
     ),
     /* Workaround for issue #206 (fixed after SBT 0.11.0) */
     watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 5ad6422..d9168e3 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -61,6 +61,16 @@
       <groupId>org.apache.avro</groupId>
       <artifactId>avro-ipc</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -106,6 +116,46 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <exportAntProperties>true</exportAntProperties>
+              <tasks>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
+                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
+                  <condition>
+                    <not>
+                      <or>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
+                      </or>
+                    </not>
+                  </condition>
+                </fail>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <SPARK_HOME>${basedir}/..</SPARK_HOME>
+            <SPARK_TESTING>1</SPARK_TESTING>
+            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+          </environmentVariables>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c1a87d3..4302ef4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -349,7 +349,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     try {
       val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
       if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent()
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {
           logError("Staging directory is null")
           return

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1a380ae..4e0e060 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,26 +17,31 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.net.{InetSocketAddress, URI}
+import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, Records}
+
 import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
 import scala.collection.JavaConversions._
+
 import org.apache.spark.Logging 
 import org.apache.spark.util.Utils
-import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.spark.deploy.SparkHadoopUtil
 
 class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@@ -46,13 +51,14 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private var distFiles = None: Option[String]
-  private var distFilesTimeStamps = None: Option[String]
-  private var distFilesFileSizes = None: Option[String]
-  private var distArchives = None: Option[String]
-  private var distArchivesTimeStamps = None: Option[String]
-  private var distArchivesFileSizes = None: Option[String]
-  
+  private val SPARK_STAGING: String = ".sparkStaging"
+  private val distCacheMgr = new ClientDistributedCacheManager()
+
+  // staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+  // app files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) 
+
   def run() {
     init(yarnConf)
     start()
@@ -63,8 +69,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     verifyClusterResources(newApp)
     val appContext = createApplicationSubmissionContext(appId)
-    val localResources = prepareLocalResources(appId, ".sparkStaging")
-    val env = setupLaunchEnv(localResources)
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val env = setupLaunchEnv(localResources, appStagingDir)
     val amContainer = createContainerLaunchContext(newApp, localResources, env)
 
     appContext.setQueue(args.amQueue)
@@ -76,7 +83,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     monitorApplication(appId)
     System.exit(0)
   }
-  
+
+  def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
 
   def logClusterResourceDetails() {
     val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
@@ -116,73 +126,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return appContext
   }
 
+  /*
+   * see if two file systems are the same or not.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null) {
+      return false
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false
+    }
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch {
+        case e: UnknownHostException =>
+          return false
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false
+    } else if (srcHost != null && dstHost == null) {
+      return false
+    }
+    //check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false
+    }
+    return true;
+  }
+
   /**
-   * Copy the local file into HDFS and configure to be distributed with the
-   * job via the distributed cache.
-   * If a fragment is specified the file will be referenced as that fragment.
+   * Copy the file into HDFS if needed.
    */
-  private def copyLocalFile(
+  private def copyRemoteFile(
       dstDir: Path,
-      resourceType: LocalResourceType,
       originalPath: Path,
       replication: Short,
-      localResources: HashMap[String,LocalResource],
-      fragment: String,
-      appMasterOnly: Boolean = false): Unit = {
+      setPerms: Boolean = false): Path = {
     val fs = FileSystem.get(conf)
-    val newPath = new Path(dstDir, originalPath.getName())
-    logInfo("Uploading " + originalPath + " to " + newPath)
-    fs.copyFromLocalFile(false, true, originalPath, newPath)
-    fs.setReplication(newPath, replication);
-    val destStatus = fs.getFileStatus(newPath)
-
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(resourceType)
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
-    amJarRsrc.setTimestamp(destStatus.getModificationTime())
-    amJarRsrc.setSize(destStatus.getLen())
-    var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName());
-    if ((fragment == null) || (fragment.isEmpty())){
-      localResources(originalPath.getName()) = amJarRsrc
-    } else {
-      localResources(fragment) = amJarRsrc
-      pathURI = new URI(newPath.toString() + "#" + fragment);
-    }
-    val distPath = pathURI.toString()
-    if (appMasterOnly == true) return
-    if (resourceType == LocalResourceType.FILE) {
-      distFiles match {
-        case Some(path) =>
-          distFilesFileSizes = Some(distFilesFileSizes.get + "," + 
-            destStatus.getLen().toString())
-          distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + 
-            destStatus.getModificationTime().toString())
-          distFiles = Some(path + "," + distPath)
-        case _ => 
-          distFilesFileSizes = Some(destStatus.getLen().toString())
-          distFilesTimeStamps = Some(destStatus.getModificationTime().toString())
-          distFiles = Some(distPath)
-      }
-    } else {
-      distArchives match {
-        case Some(path) =>
-          distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," +
-            destStatus.getModificationTime().toString())
-          distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + 
-            destStatus.getLen().toString())
-          distArchives = Some(path + "," + distPath)
-        case _ => 
-          distArchivesTimeStamps = Some(destStatus.getModificationTime().toString())
-          distArchivesFileSizes = Some(destStatus.getLen().toString())
-          distArchives = Some(distPath)
-      }
-    }
+    val remoteFs = originalPath.getFileSystem(conf);
+    var newPath = originalPath
+    if (! compareFs(remoteFs, fs)) {
+      newPath = new Path(dstDir, originalPath.getName())
+      logInfo("Uploading " + originalPath + " to " + newPath)
+      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
+      fs.setReplication(newPath, replication);
+      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+    } 
+    // resolve any symlinks in the URI path so using a "current" symlink
+    // to point to a specific version shows the specific version
+    // in the distributed cache configuration
+    val qualPath = fs.makeQualified(newPath)
+    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+    val destPath = fc.resolvePath(qualPath)
+    destPath
   }
 
-  def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = {
+  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
     logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system
+    // Upload Spark and the application JAR to the remote file system if necessary
     // Add them as local resources to the AM
     val fs = FileSystem.get(conf)
 
@@ -193,9 +203,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         System.exit(1)
       }
     }
-
-    val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/"
-    val dst = new Path(fs.getHomeDirectory(), pathSuffix)
+    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
     val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -203,55 +211,65 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       dstFs.addDelegationTokens(delegTokenRenewer, credentials);
     }
     val localResources = HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+    if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
+      logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
+      System.exit(1)
+    }
 
-    Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
+    Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, 
+      Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
     .foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
       if (! localPath.isEmpty()) {
-        val src = new Path(localPath)
-        val newPath = new Path(dst, destName)
-        logInfo("Uploading " + src + " to " + newPath)
-        fs.copyFromLocalFile(false, true, src, newPath)
-        fs.setReplication(newPath, replication);
-        val destStatus = fs.getFileStatus(newPath)
-
-        val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-        amJarRsrc.setType(LocalResourceType.FILE)
-        amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
-        amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
-        amJarRsrc.setTimestamp(destStatus.getModificationTime())
-        amJarRsrc.setSize(destStatus.getLen())
-        localResources(destName) = amJarRsrc
+        var localURI = new URI(localPath)
+        // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+        if (localURI.getScheme() == null) {
+          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
+        }
+        val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+        val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          destName, statCache)
       }
     }
 
     // handle any add jars
     if ((args.addJars != null) && (!args.addJars.isEmpty())){
       args.addJars.split(',').foreach { case file: String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
-          tmpURI.getFragment(), true)
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache, true)
       }
     }
 
     // handle any distributed cache files
     if ((args.files != null) && (!args.files.isEmpty())){
       args.files.split(',').foreach { case file: String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
-          tmpURI.getFragment())
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache)
       }
     }
 
     // handle any distributed cache archives
     if ((args.archives != null) && (!args.archives.isEmpty())) {
       args.archives.split(',').foreach { case file:String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, 
-          localResources, tmpURI.getFragment())
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, 
+          linkname, statCache)
       }
     }
 
@@ -259,44 +277,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return localResources
   }
   
-  def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
+  def setupLaunchEnv(
+      localResources: HashMap[String, LocalResource], 
+      stagingDir: String): HashMap[String, String] = {
     logInfo("Setting up the launch environment")
-    val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
+    val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
 
     val env = new HashMap[String, String]()
 
     Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
-    env("SPARK_YARN_JAR_PATH") = 
-      localResources("spark.jar").getResource().getScheme.toString() + "://" +
-      localResources("spark.jar").getResource().getFile().toString()
-    env("SPARK_YARN_JAR_TIMESTAMP") =  localResources("spark.jar").getTimestamp().toString()
-    env("SPARK_YARN_JAR_SIZE") =  localResources("spark.jar").getSize().toString()
-
-    env("SPARK_YARN_USERJAR_PATH") =
-      localResources("app.jar").getResource().getScheme.toString() + "://" +
-      localResources("app.jar").getResource().getFile().toString()
-    env("SPARK_YARN_USERJAR_TIMESTAMP") =  localResources("app.jar").getTimestamp().toString()
-    env("SPARK_YARN_USERJAR_SIZE") =  localResources("app.jar").getSize().toString()
-
-    if (log4jConfLocalRes != null) {
-      env("SPARK_YARN_LOG4J_PATH") =
-        log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
-      env("SPARK_YARN_LOG4J_TIMESTAMP") =  log4jConfLocalRes.getTimestamp().toString()
-      env("SPARK_YARN_LOG4J_SIZE") =  log4jConfLocalRes.getSize().toString()
-    }
+    env("SPARK_YARN_STAGING_DIR") = stagingDir
 
     // set the environment variables to be passed on to the Workers
-    if (distFiles != None) {
-      env("SPARK_YARN_CACHE_FILES") = distFiles.get
-      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
-      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get
-    }
-    if (distArchives != None) {
-      env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get
-      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get
-      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get
-    }
+    distCacheMgr.setDistFilesEnv(env)
+    distCacheMgr.setDistArchivesEnv(env)
 
     // allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
@@ -365,6 +360,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
     }
 
+    if (args.userClass == null) {
+      logError("Error: You must specify a user class!")
+      System.exit(1)
+    }
+
     val commands = List[String](javaCommand + 
       " -server " +
       JAVA_OPTS +
@@ -432,6 +432,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 }
 
 object Client {
+  val SPARK_JAR: String = "spark.jar"
+  val APP_JAR: String = "app.jar"
+  val LOG4J_PROP: String = "log4j.properties"
+
   def main(argStrings: Array[String]) {
     // Set an env variable indicating we are running in YARN mode.
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
@@ -453,22 +457,22 @@ object Client {
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "log4j.properties")
+        Path.SEPARATOR + LOG4J_PROP)
     }
     // normally the users app.jar is last in case conflicts with spark jars
     val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "app.jar")
+        Path.SEPARATOR + APP_JAR)
     }
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-      Path.SEPARATOR + "spark.jar")
+      Path.SEPARATOR + SPARK_JAR)
     Client.populateHadoopClasspath(conf, env)
 
     if (!userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "app.jar")
+        Path.SEPARATOR + APP_JAR)
     }
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
       Path.SEPARATOR + "*")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000..07686fe
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import org.apache.spark.Logging 
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable.Map
+
+
+/** Client side methods to setup the Hadoop distributed cache */
+class ClientDistributedCacheManager() extends Logging {
+  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+
+
+  /**
+   * Add a resource to the list of distributed cache resources. This list can
+   * be sent to the ApplicationMaster and possibly the workers so that it can 
+   * be downloaded into the Hadoop distributed cache for use by this application.
+   * Adds the LocalResource to the localResources HashMap passed in and saves 
+   * the stats of the resources to they can be sent to the workers and verified.
+   *
+   * @param fs FileSystem
+   * @param conf Configuration
+   * @param destPath path to the resource
+   * @param localResources localResource hashMap to insert the resource into
+   * @param resourceType LocalResourceType 
+   * @param link link presented in the distributed cache to the destination
+   * @param statCache cache to store the file/directory stats 
+   * @param appMasterOnly Whether to only add the resource to the app master
+   */
+  def addResource(
+      fs: FileSystem,
+      conf: Configuration,
+      destPath: Path, 
+      localResources: HashMap[String, LocalResource],
+      resourceType: LocalResourceType,
+      link: String,
+      statCache: Map[URI, FileStatus],
+      appMasterOnly: Boolean = false) = {
+    val destStatus = fs.getFileStatus(destPath)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(resourceType)
+    val visibility = getVisibility(conf, destPath.toUri(), statCache)
+    amJarRsrc.setVisibility(visibility)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+    localResources(link) = amJarRsrc
+    
+    if (appMasterOnly == false) {
+      val uri = destPath.toUri()
+      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
+      if (resourceType == LocalResourceType.FILE) {
+        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      } else {
+        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      }
+    }
+  }
+
+  /**
+   * Adds the necessary cache file env variables to the env passed in
+   * @param env
+   */
+  def setDistFilesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheFiles.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Adds the necessary cache archive env variables to the env passed in
+   * @param env
+   */
+  def setDistArchivesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheArchives.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Returns the local resource visibility depending on the cache file permissions
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return LocalResourceVisibility
+   */
+  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+      LocalResourceVisibility = {
+    if (isPublic(conf, uri, statCache)) {
+      return LocalResourceVisibility.PUBLIC 
+    } 
+    return LocalResourceVisibility.PRIVATE
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+    val fs = FileSystem.get(uri, conf)
+    val current = new Path(uri.getPath())
+    //the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+      return false
+    }
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory heirarchy to the given path)
+   * @param fs
+   * @param path
+   * @param statCache
+   * @return true if all ancestors have the 'execute' permission set for all users
+   */
+  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
+      statCache: Map[URI, FileStatus]): Boolean =  {
+    var current = path
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+        return false
+      }
+      current = current.getParent()
+    }
+    return true
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def checkPermissionOfOther(fs: FileSystem, path: Path,
+      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+    val status = getFileStatus(fs, path.toUri(), statCache);
+    val perms = status.getPermission()
+    val otherAction = perms.getOtherAction()
+    if (otherAction.implies(action)) {
+      return true;
+    }
+    return false
+  }
+
+  /**
+   * Checks to see if the given uri exists in the cache, if it does it 
+   * returns the existing FileStatus, otherwise it stats the uri, stores
+   * it in the cache, and returns the FileStatus.
+   * @param fs
+   * @param uri
+   * @param statCache
+   * @return FileStatus
+   */
+  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+    val stat = statCache.get(uri) match {
+      case Some(existstat) => existstat
+      case None => 
+        val newStat = fs.getFileStatus(new Path(uri))
+        statCache.put(uri, newStat)
+        newStat
+    }
+    return stat
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index ba352da..7a66532 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -142,11 +142,12 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       rtype: LocalResourceType,
       localResources: HashMap[String, LocalResource],
       timestamp: String,
-      size: String) = {
+      size: String, 
+      vis: String) = {
     val uri = new URI(file)
     val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
     amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
     amJarRsrc.setTimestamp(timestamp.toLong)
     amJarRsrc.setSize(size.toLong)
@@ -158,44 +159,14 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     logInfo("Preparing Local resources")
     val localResources = HashMap[String, LocalResource]()
     
-    // Spark JAR
-    val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    sparkJarResource.setType(LocalResourceType.FILE)
-    sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
-    sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
-      new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
-    sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
-    sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
-    localResources("spark.jar") = sparkJarResource
-    // User JAR
-    val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    userJarResource.setType(LocalResourceType.FILE)
-    userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
-    userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
-      new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
-    userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
-    userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
-    localResources("app.jar") = userJarResource
-
-    // Log4j conf - if available
-    if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
-      val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-      log4jConfResource.setType(LocalResourceType.FILE)
-      log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
-      log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
-        new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
-      log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
-      log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
-      localResources("log4j.properties") = log4jConfResource
-    }
-
     if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
       val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
       val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
       val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
       for( i <- 0 to distFiles.length - 1) {
         setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i))
+          fileSizes(i), visibilities(i))
       }
     }
 
@@ -203,9 +174,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
       val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
       val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
       for( i <- 0 to distArchives.length - 1) {
         setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
-          timeStamps(i), fileSizes(i))
+          timeStamps(i), fileSizes(i), visibilities(i))
       }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e134ed5d/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
new file mode 100644
index 0000000..c0a2af0
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito.when
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+
+class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
+
+  class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
+    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): 
+        LocalResourceVisibility = {
+      return LocalResourceVisibility.PRIVATE
+    }
+  }
+  
+  test("test getFileStatus empty") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath() === null)
+  }
+
+  test("test getFileStatus cached") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath().toString() === "/tmp/testing")
+  }
+
+  test("test addResource") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 0)
+    assert(resource.getSize() === 0)
+    assert(resource.getType() === LocalResourceType.FILE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+
+    //add another one and verify both there and order correct
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing2"))
+    val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
+    when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
+    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", 
+      statCache, false)
+    val resource2 = localResources("link2")
+    assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
+    assert(resource2.getTimestamp() === 10)
+    assert(resource2.getSize() === 20)
+    assert(resource2.getType() === LocalResourceType.FILE)
+
+    val env2 = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env2)
+    val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+    val files = env2("SPARK_YARN_CACHE_FILES").split(',') 
+    val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+    val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+    assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(timestamps(0)  === "0")
+    assert(sizes(0)  === "0")
+    assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
+
+    assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
+    assert(timestamps(1)  === "10")
+    assert(sizes(1)  === "20")
+    assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
+  }
+
+  test("test addResource link null") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    intercept[Exception] {
+      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, 
+        statCache, false)
+    }
+    assert(localResources.get("link") === None)
+    assert(localResources.size === 0)
+  }
+
+  test("test addResource appmaster only") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, true)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+  }
+
+  test("test addResource archive") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+  }
+
+
+}