You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/27 17:34:18 UTC

samza git commit: SAMZA-1143; Universal config support for localized resource

Repository: samza
Updated Branches:
  refs/heads/master 9d52e996b -> e4cfeebeb


SAMZA-1143; Universal config support for localized resource

More details in https://issues.apache.org/jira/browse/SAMZA-1143

Tests: ./gradlew clean check successful and all unit tests passed

Author: Fred Ji <fj...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #90 from fredji97/universalLocalizer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4cfeebe
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4cfeebe
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4cfeebe

Branch: refs/heads/master
Commit: e4cfeebebf3ec94ca3b5aca2e4aed10e77db38ae
Parents: 9d52e99
Author: Fred Ji <fj...@linkedin.com>
Authored: Mon Mar 27 10:34:12 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Mar 27 10:34:12 2017 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/FileSystemImplConfig.java    |  86 +++++++++
 .../samza/job/yarn/LocalizerResourceConfig.java | 102 +++++++++++
 .../job/yarn/LocalizerResourceException.java    |  72 ++++++++
 .../samza/job/yarn/LocalizerResourceMapper.java | 101 +++++++++++
 .../job/yarn/YarnClusterResourceManager.java    |   8 +-
 .../samza/job/yarn/YarnContainerRunner.java     |  26 ++-
 .../job/yarn/YarnResourceManagerFactory.java    |   1 -
 .../apache/samza/job/yarn/ClientHelper.scala    |  21 ++-
 .../apache/samza/job/yarn/YarnJobFactory.scala  |  11 +-
 .../job/yarn/TestFileSystemImplConfig.java      |  75 ++++++++
 .../job/yarn/TestLocalizerResourceConfig.java   | 125 +++++++++++++
 .../job/yarn/TestLocalizerResourceMapper.java   | 174 +++++++++++++++++++
 .../samza/job/yarn/TestYarnJobFactory.java      |  48 +++++
 .../samza/job/yarn/TestYarnJobFactory.scala     |  45 -----
 14 files changed, 836 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
new file mode 100644
index 0000000..8e79104
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * FileSystemImplConfig is intended to manage the Samza config for fs.&lt;scheme&gt;impl.
+ * e.g. fs.http.impl
+ */
+public class FileSystemImplConfig {
+  private static final Logger log = LoggerFactory.getLogger(FileSystemImplConfig.class);
+  private static final String FS_IMPL_PREFIX = "fs.";
+  private static final String FS_IMPL_SUFFIX = ".impl";
+  private static final String FS_IMPL = "fs.%s.impl";
+
+  private final Config config;
+
+  public FileSystemImplConfig(final Config config) {
+    if (null == config) {
+      throw new IllegalArgumentException("config cannot be null");
+    }
+    this.config = config;
+  }
+
+  /**
+   * Get all schemes
+   * @return List of schemes in strings
+   */
+  public List<String> getSchemes() {
+    Config subConfig = config.subset(FS_IMPL_PREFIX, true);
+    List<String> schemes = new ArrayList<String>();
+    for (String key : subConfig.keySet()) {
+      if (key.endsWith(FS_IMPL_SUFFIX)) {
+        schemes.add(key.substring(0, key.length() - FS_IMPL_SUFFIX.length()));
+      }
+    }
+    return schemes;
+  }
+
+  /**
+   * Get the fs.&lt;scheme&gt;impl as the config key from scheme
+   * @param scheme scheme name, such as http, hdfs, myscheme
+   * @return fs.&lt;scheme&gt;impl
+   */
+  public String getFsImplKey(final String scheme) {
+    String fsImplKey = String.format(FS_IMPL, scheme);
+    return fsImplKey;
+  }
+
+  /**
+   * Get the class name corresponding for the given scheme
+   * @param scheme scheme name, such as http, hdfs, myscheme
+   * @return full scoped class name for the file system for &lt;scheme&gt;
+   */
+  public String getFsImplClassName(final String scheme) {
+    String fsImplKey = getFsImplKey(scheme);
+    String fsImplClassName = config.get(fsImplKey);
+    if (StringUtils.isEmpty(fsImplClassName)) {
+      throw new LocalizerResourceException(fsImplKey + " does not have configured class implementation");
+    }
+    return fsImplClassName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
new file mode 100644
index 0000000..ca94783
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.samza.config.Config;
+
+/**
+ * LocalizerResourceConfig is intended to manage/fetch the config values
+ * for the yarn localizer resource(s) from the configuration.
+ *
+ * There are 4 config values
+ *   yarn.resources.&lt;resourceName&gt;.path
+ *     (Required) The path for fetching the resource for localization,
+ *     e.g. http://hostname.com/test.
+ *   yarn.resources.&lt;resourceName&gt;.local.name
+ *     (Optional) The local name used for the localized resource.
+ *     If not set, the default one will be the &lt;resourceName&gt; from the config key.
+ *   yarn.resources.&lt;resourceName&gt;.local.type
+ *     (Optional) The value value is a string format of {@link LocalResourceType}:
+ *     ARCHIVE, FILE, PATTERN.
+ *     If not set, the default value is FILE.
+ *   yarn.resources.&lt;resourceName&gt;.local.visibility
+ *     (Optional) The valid value is a string format of {@link LocalResourceVisibility}:
+ *     PUBLIC, PRIVATE, or APPLICATION.
+ *     If not set, the default value is is APPLICATION.
+ */
+public class LocalizerResourceConfig {
+  private static final String RESOURCE_PREFIX = "yarn.resources.";
+  private static final String PATH_SUFFIX = ".path";
+  private static final String RESOURCE_PATH = "yarn.resources.%s.path";
+  private static final String RESOURCE_LOCAL_NAME = "yarn.resources.%s.local.name";
+  private static final String RESOURCE_LOCAL_TYPE = "yarn.resources.%s.local.type";
+  private static final String RESOURCE_LOCAL_VISIBILITY = "yarn.resources.%s.local.visibility";
+  private static final String DEFAULT_RESOURCE_LOCAL_TYPE = "FILE";
+  private static final String DEFAULT_RESOURCE_LOCAL_VISIBILITY = "APPLICATION";
+
+  private final Config config;
+
+  public LocalizerResourceConfig(final Config config) {
+    if (null == config) {
+      throw new IllegalArgumentException("config cannot be null");
+    }
+    this.config = config;
+  }
+
+  public List<String> getResourceNames() {
+    Config subConfig = config.subset(RESOURCE_PREFIX, true);
+    List<String> resourceNames = new ArrayList<String>();
+    for (String key : subConfig.keySet()) {
+      if (key.endsWith(PATH_SUFFIX)) {
+        resourceNames.add(key.substring(0, key.length() - PATH_SUFFIX.length()));
+      }
+    }
+    return resourceNames;
+  }
+
+  public Path getResourcePath(final String resourceName) {
+    String pathStr = config.get(String.format(RESOURCE_PATH, resourceName));
+    if (StringUtils.isEmpty(pathStr)) {
+      throw new LocalizerResourceException("resource path is required but not defined in config for resource " + resourceName);
+    }
+    return new Path(pathStr);
+  }
+
+  public LocalResourceType getResourceLocalType(final String resourceName) {
+    String typeStr = config.get(String.format(RESOURCE_LOCAL_TYPE, resourceName), DEFAULT_RESOURCE_LOCAL_TYPE);
+    return LocalResourceType.valueOf(StringUtils.upperCase(typeStr));
+  }
+
+  public LocalResourceVisibility getResourceLocalVisibility(final String resourceName) {
+    String visibilityStr = config.get(String.format(RESOURCE_LOCAL_VISIBILITY, resourceName), DEFAULT_RESOURCE_LOCAL_VISIBILITY);
+    return LocalResourceVisibility.valueOf(StringUtils.upperCase(visibilityStr));
+  }
+
+  public String getResourceLocalName(final String resourceName) {
+    String name = config.get(String.format(RESOURCE_LOCAL_NAME, resourceName), resourceName);
+    return name;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
new file mode 100644
index 0000000..0df6903
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+public class LocalizerResourceException extends RuntimeException {
+
+  /**
+   * Constructs an {@code LocalizerResourceException} with {@code null}
+   * as its error detail message.
+   */
+  public LocalizerResourceException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LocalizerResourceException} with the specified detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LocalizerResourceException(String message) {
+    super(message);
+  }
+
+  /**
+   * Constructs an {@code LocalizerResourceException} with the specified detail message
+   * and cause.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   *
+   * @param cause
+   *        The cause (which is saved for later retrieval by the
+   *        {@link #getCause()} method).  (A null value is permitted,
+   *        and indicates that the cause is nonexistent or unknown.)
+   */
+  public LocalizerResourceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  /**
+   * Constructs an {@code LocalizerResourceException} with the specified cause and a
+   * detail message of {@code (cause==null ? null : cause.toString())}
+   *
+   * @param cause
+   *        The cause (which is saved for later retrieval by the
+   *        {@link #getCause()} method).  (A null value is permitted,
+   *        and indicates that the cause is nonexistent or unknown.)
+   */
+  public LocalizerResourceException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
new file mode 100644
index 0000000..6dddb0a
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A universal approach to generate local resource map which can be put in ContainerLaunchContext directly
+ */
+public class LocalizerResourceMapper {
+  private static final Logger log = LoggerFactory.getLogger(LocalizerResourceMapper.class);
+
+  private final YarnConfiguration yarnConfiguration; //yarn configurations
+  private final LocalizerResourceConfig resourceConfig;
+  private final Map<String, LocalResource> localResourceMap;
+
+  public LocalizerResourceMapper(LocalizerResourceConfig resourceConfig, YarnConfiguration yarnConfiguration) {
+    this.yarnConfiguration = yarnConfiguration;
+    this.resourceConfig = resourceConfig;
+    this.localResourceMap = buildResourceMapping();
+  }
+
+  private Map<String, LocalResource> buildResourceMapping() {
+    ImmutableMap.Builder<String, LocalResource>  localResourceMapBuilder = ImmutableMap.builder();
+
+    List<String> resourceNames = resourceConfig.getResourceNames();
+    for (String resourceName : resourceNames) {
+      String resourceLocalName = resourceConfig.getResourceLocalName(resourceName);
+      LocalResourceType resourceType = resourceConfig.getResourceLocalType(resourceName);
+      LocalResourceVisibility resourceVisibility = resourceConfig.getResourceLocalVisibility(resourceName);
+      Path resourcePath = resourceConfig.getResourcePath(resourceName);
+
+      LocalResource localResource = createLocalResource(resourcePath, resourceType, resourceVisibility);
+
+      localResourceMapBuilder.put(resourceLocalName, localResource);
+      log.info("preparing local resource: {}", resourceLocalName);
+    }
+
+    return localResourceMapBuilder.build();
+  }
+
+  private LocalResource createLocalResource(Path resourcePath, LocalResourceType resourceType, LocalResourceVisibility resourceVisibility) {
+    LocalResource localResource = Records.newRecord(LocalResource.class);
+    URL resourceUrl = ConverterUtils.getYarnUrlFromPath(resourcePath);
+    try {
+      FileStatus resourceFileStatus = resourcePath.getFileSystem(yarnConfiguration).getFileStatus(resourcePath);
+
+      if (null == resourceFileStatus) {
+        throw new LocalizerResourceException("Check getFileStatus implementation. getFileStatus gets unexpected null for resourcePath " + resourcePath);
+      }
+
+      localResource.setResource(resourceUrl);
+      log.info("setLocalizerResource for {}", resourceUrl);
+      localResource.setSize(resourceFileStatus.getLen());
+      localResource.setTimestamp(resourceFileStatus.getModificationTime());
+      localResource.setType(resourceType);
+      localResource.setVisibility(resourceVisibility);
+      return localResource;
+    } catch (IOException ioe) {
+      log.error("IO Exception when accessing the resource file status from the filesystem: " + resourcePath, ioe);
+      throw new LocalizerResourceException("IO Exception when accessing the resource file status from the filesystem: " + resourcePath);
+    }
+
+  }
+
+  public Map<String, LocalResource> getResourceMap() {
+    return ImmutableMap.copyOf(localResourceMap);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index c1b71bb..04c78be 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -36,12 +36,10 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.job.yarn.YarnContainer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -120,6 +118,12 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     hConfig = new YarnConfiguration();
     hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
 
+    // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
+    FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
+    fsImplConfig.getSchemes().forEach(
+        scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
+    );
+
     MetricsRegistryMap registry = new MetricsRegistryMap();
     metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index 181102b..c45fc7f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -43,10 +43,12 @@ import org.apache.samza.job.CommandBuilder;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer
@@ -112,11 +114,11 @@ public class YarnContainerRunner {
 
     log.info("Samza FWK path: " + command + "; env=" + env);
 
-    Path path = new Path(yarnConfig.getPackagePath());
-    log.info("Starting container ID {} using package path {}", samzaContainerId, path);
+    Path packagePath = new Path(yarnConfig.getPackagePath());
+    log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
 
     startContainer(
-        path,
+        packagePath,
         container,
         env,
         getFormattedCommand(
@@ -151,6 +153,8 @@ public class YarnContainerRunner {
     log.info("starting container {} {} {} {}",
         new Object[]{packagePath, container, env, cmd});
 
+    // TODO: SAMZA-1144 remove the customized approach for package resource and use the common one.
+    // But keep it now for backward compatibility.
     // set the local package so that the containers and app master are provisioned with it
     LocalResource packageResource = Records.newRecord(LocalResource.class);
     URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
@@ -163,6 +167,7 @@ public class YarnContainerRunner {
     }
 
     packageResource.setResource(packageUrl);
+    log.info("set package Resource in YarnContainerRunner for {}", packageUrl);
     packageResource.setSize(fileStatus.getLen());
     packageResource.setTimestamp(fileStatus.getModificationTime());
     packageResource.setType(LocalResourceType.ARCHIVE);
@@ -190,13 +195,20 @@ public class YarnContainerRunner {
       throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
     }
 
+    Map<String, LocalResource> localResourceMap = new HashMap<>();
+    localResourceMap.put("__package", packageResource);
+
+    // include the resources from the universal resource configurations
+    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
+    localResourceMap.putAll(resourceMapper.getResourceMap());
+
     ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
     context.setEnvironment(env);
     context.setTokens(allTokens.duplicate());
     context.setCommands(new ArrayList<String>() {{add(cmd);}});
-    context.setLocalResources(Collections.singletonMap("__package", packageResource));
+    context.setLocalResources(localResourceMap);
 
-    log.debug("setting package to {}", packageResource);
+    log.debug("setting localResourceMap to {}", localResourceMap);
     log.debug("setting context to {}", context);
 
     StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
index 988a8e8..3f9a84d 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.samza.job.yarn;
 
 import org.apache.samza.clustermanager.ClusterResourceManager;

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 6c63b63..e5aafbb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -20,12 +20,12 @@
 package org.apache.samza.job.yarn
 
 import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.samza.config.{JobConfig, Config, YarnConfig}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamWriter}
+import org.apache.samza.config.{Config, JobConfig, YarnConfig}
+import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 
 import scala.collection.JavaConversions._
-import scala.collection.{Map}
+import scala.collection.Map
 import scala.collection.mutable.HashMap
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -140,6 +140,8 @@ class ClientHelper(conf: Configuration) extends Logging {
       case None =>
     }
 
+    // TODO: remove the customized approach for package resource and use the common one.
+    // But keep it now for backward compatibility.
     // set the local package so that the containers and app master are provisioned with it
     val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
     val fs = packagePath.getFileSystem(conf)
@@ -167,6 +169,17 @@ class ClientHelper(conf: Configuration) extends Logging {
     val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]()
     localResources += "__package" -> packageResource
 
+
+    // include the resources from the universal resource configurations
+    try {
+      val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf))
+      localResources ++= resourceMapper.getResourceMap
+    } catch {
+      case e: LocalizerResourceException => {
+        throw new SamzaException("Exception during resource mapping from config. ", e)
+      }
+    }
+
     if (UserGroupInformation.isSecurityEnabled()) {
       validateJobConfig(config)
 
@@ -187,6 +200,8 @@ class ClientHelper(conf: Configuration) extends Logging {
       coordinatorStreamWriter.stop()
     }
 
+    // prepare all local resources for localizer
+    info("localResources is: %s" format localResources)
     containerCtx.setLocalResources(localResources)
     info("set local resources on application master for %s" format appId.get)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index 625d3bb..f057594 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -25,8 +25,10 @@ import org.apache.samza.job.StreamJobFactory
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.config.Config
 import org.apache.samza.util.hadoop.HttpFileSystem
+import org.apache.samza.util.Logging
+import scala.collection.JavaConversions._
 
-class YarnJobFactory extends StreamJobFactory {
+class YarnJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config) = {
     // TODO fix this. needed to support http package locations.
     val hConfig = new YarnConfiguration
@@ -37,6 +39,13 @@ class YarnJobFactory extends StreamJobFactory {
     if (config.containsKey(YarnConfiguration.RM_ADDRESS)) {
       hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032"))
     }
+
+    // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
+    val fsImplConfig = new FileSystemImplConfig(config)
+    fsImplConfig.getSchemes.foreach(
+      (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
+    )
+
     new YarnJob(config, hConfig)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
new file mode 100644
index 0000000..6e11c66
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFileSystemImplConfig {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFileSystemImplConfigSuccess() {
+    Map<String, String> configMap = new HashMap<>();
+
+    configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem");
+    configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem");
+
+    Config conf = new MapConfig(configMap);
+
+    FileSystemImplConfig manager = new FileSystemImplConfig(conf);
+    assertEquals(2, manager.getSchemes().size());
+    assertEquals("http", manager.getSchemes().get(0));
+    assertEquals("myscheme", manager.getSchemes().get(1));
+
+    assertEquals("fs.http.impl", manager.getFsImplKey("http"));
+    assertEquals("fs.myscheme.impl", manager.getFsImplKey("myscheme"));
+
+    assertEquals("org.apache.samza.HttpFileSystem", manager.getFsImplClassName("http"));
+    assertEquals("org.apache.samza.MySchemeFileSystem", manager.getFsImplClassName("myscheme"));
+  }
+
+  @Test
+  public void testNullConfig() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("config cannot be null");
+    FileSystemImplConfig manager = new FileSystemImplConfig(null);
+  }
+
+  @Test
+  public void testEmptyImpl() {
+    thrown.expect(LocalizerResourceException.class);
+    thrown.expectMessage("fs.http.impl does not have configured class implementation");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("fs.http.impl", "");
+    Config conf = new MapConfig(configMap);
+
+    FileSystemImplConfig manager = new FileSystemImplConfig(conf);
+    manager.getFsImplClassName("http");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
new file mode 100644
index 0000000..e003125
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestLocalizerResourceConfig {
+
+  @Rule
+  public ExpectedException thrown= ExpectedException.none();
+
+  @Test
+  public void testResourceConfigIncluded() {
+    Map<String, String> configMap = new HashMap<>();
+
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "file");
+    configMap.put("yarn.resources.myResource1.local.visibility", "public");
+
+    Config conf = new MapConfig(configMap);
+
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+    assertEquals(1, manager.getResourceNames().size());
+    assertEquals("myResource1", manager.getResourceNames().get(0));
+    assertEquals("readme", manager.getResourceLocalName("myResource1"));
+    assertEquals(LocalResourceType.FILE, manager.getResourceLocalType("myResource1"));
+    assertEquals(LocalResourceVisibility.PUBLIC, manager.getResourceLocalVisibility("myResource1"));
+  }
+
+  @Test
+  public void testResourcrConfigNotIncluded() {
+    Map<String, String> configMap = new HashMap<>();
+
+    configMap.put("otherconfig", "https://host2.com/not_included");
+    configMap.put("yarn.resources.myResource2.local.name", "notExisting");
+    configMap.put("yarn.resources.myResource2.local.type", "file");
+    configMap.put("yarn.resources.myResource2.local.visibility", "application");
+
+    Config conf = new MapConfig(configMap);
+
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+    assertEquals(0, manager.getResourceNames().size());
+  }
+
+  @Test
+  public void testNullConfig() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("config cannot be null");
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(null);
+  }
+
+  @Test
+  public void testInvalidVisibility() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "file");
+    configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility");
+    Config conf = new MapConfig(configMap);
+
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+    manager.getResourceLocalVisibility("myResource1");
+  }
+
+  @Test
+  public void testInvalidType() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+    configMap.put("yarn.resources.myResource1.local.visibility", "application");
+    Config conf = new MapConfig(configMap);
+
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+    manager.getResourceLocalType("myResource1");
+  }
+
+  @Test
+  public void testInvalidPath() {
+    thrown.expect(LocalizerResourceException.class);
+    thrown.expectMessage("resource path is required but not defined in config for resource myResource1");
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+    configMap.put("yarn.resources.myResource1.local.visibility", "application");
+    Config conf = new MapConfig(configMap);
+
+    LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+    manager.getResourcePath("myResource1");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
new file mode 100644
index 0000000..d065019
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestLocalizerResourceMapper {
+
+  @Rule
+  public ExpectedException thrown= ExpectedException.none();
+
+  @Test
+  public void testResourceMapSuccess() {
+
+    Map<String, String> configMap = new HashMap<>();
+
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "file");
+    configMap.put("yarn.resources.myResource1.local.visibility", "public");
+
+    configMap.put("yarn.resources.myResource2.path", "https://host2.com/package");
+    configMap.put("yarn.resources.myResource2.local.name", "__package");
+    configMap.put("yarn.resources.myResource2.local.type", "archive");
+    configMap.put("yarn.resources.myResource2.local.visibility", "private");
+
+    configMap.put("yarn.resources.myResource3.path", "https://host3.com/csr");
+    configMap.put("yarn.resources.myResource3.local.name", "csr");
+    configMap.put("yarn.resources.myResource3.local.type", "file");
+    configMap.put("yarn.resources.myResource3.local.visibility", "application");
+
+    configMap.put("otherconfig", "https://host4.com/not_included");
+    configMap.put("yarn.resources.myResource4.local.name", "notExisting");
+    configMap.put("yarn.resources.myResource4.local.type", "file");
+    configMap.put("yarn.resources.myResource4.local.visibility", "application");
+
+    Config conf = new MapConfig(configMap);
+
+    YarnConfiguration yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+    yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+
+    LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+    Map<String, LocalResource> resourceMap = mapper.getResourceMap();
+
+    assertEquals("resourceMap has 3 resources", 3, resourceMap.size());
+
+    // resource1
+    assertEquals("host1.com", resourceMap.get("readme").getResource().getHost());
+    assertEquals(LocalResourceType.FILE, resourceMap.get("readme").getType());
+    assertEquals(LocalResourceVisibility.PUBLIC, resourceMap.get("readme").getVisibility());
+
+    // resource 2
+    assertEquals("host2.com", resourceMap.get("__package").getResource().getHost());
+    assertEquals(LocalResourceType.ARCHIVE, resourceMap.get("__package").getType());
+    assertEquals(LocalResourceVisibility.PRIVATE, resourceMap.get("__package").getVisibility());
+
+    // resource 3
+    assertEquals("host3.com", resourceMap.get("csr").getResource().getHost());
+    assertEquals(LocalResourceType.FILE, resourceMap.get("csr").getType());
+    assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("csr").getVisibility());
+
+    // resource 4 should not exist
+    assertNull("Resource does not exist with the name myResource4", resourceMap.get("myResource4"));
+    assertNull("Resource does not exist with the defined config name notExisting for myResource4 either", resourceMap.get("notExisting"));
+  }
+
+  @Test
+  public void testResourceMapWithDefaultValues() {
+
+    Map<String, String> configMap = new HashMap<>();
+
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+
+    Config conf = new MapConfig(configMap);
+
+    YarnConfiguration yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+
+    LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+    Map<String, LocalResource> resourceMap = mapper.getResourceMap();
+
+    assertNull("Resource does not exist with a name readme", resourceMap.get("readme"));
+    assertNotNull("Resource exists with a name myResource1", resourceMap.get("myResource1"));
+    assertEquals("host1.com", resourceMap.get("myResource1").getResource().getHost());
+    assertEquals(LocalResourceType.FILE, resourceMap.get("myResource1").getType());
+    assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("myResource1").getVisibility());
+  }
+
+  @Test
+  public void testResourceMapWithFileStatusFailure() {
+    thrown.expect(LocalizerResourceException.class);
+    thrown.expectMessage("IO Exception when accessing the resource file status from the filesystem");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "unknown://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "file");
+    configMap.put("yarn.resources.myResource1.local.visibility", "public");
+    Config conf = new MapConfig(configMap);
+
+    YarnConfiguration yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+    yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+    LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+  }
+
+  @Test
+  public void testResourceMapWithInvalidVisibilityFailure() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "file");
+    configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility");
+    Config conf = new MapConfig(configMap);
+
+    YarnConfiguration yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+    yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+    LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+  }
+
+  @Test
+  public void testResourceMapWithInvalidTypeFailure() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE");
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+    configMap.put("yarn.resources.myResource1.local.name", "readme");
+    configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+    configMap.put("yarn.resources.myResource1.local.visibility", "public");
+    Config conf = new MapConfig(configMap);
+
+    YarnConfiguration yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+    yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+    LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
new file mode 100644
index 0000000..11077f0
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestYarnJobFactory {
+  @Test
+  public void testGetJobWithDefaultFsImpl() {
+    YarnJobFactory jobFactory = new YarnJobFactory();
+    YarnJob yarnJob = jobFactory.getJob(new MapConfig());
+    Configuration hConfig = yarnJob.client().yarnClient().getConfig();
+    assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.http.impl"));
+    assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.https.impl"));
+  }
+
+  @Test
+  public void  testGetJobWithFsImplOverride() {
+    YarnJobFactory jobFactory = new YarnJobFactory();
+    YarnJob yarnJob = jobFactory.getJob(new MapConfig(ImmutableMap.of(
+        "fs.http.impl", "org.apache.myHttp",
+        "fs.myscheme.impl","org.apache.myScheme")));
+    Configuration hConfig = yarnJob.client().yarnClient().getConfig();
+    assertEquals("org.apache.myHttp", hConfig.get("fs.http.impl"));
+    assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
deleted file mode 100644
index 110c0fc..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.samza.config.MapConfig
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.junit.Assert._
-import org.junit.Test
-
-
-class TestYarnJobFactory {
-
-  @Test
-  def testGetJob {
-
-    val jobFactory = new YarnJobFactory
-
-    val yarnJob = jobFactory.getJob(new MapConfig)
-
-    val hConfig = yarnJob.client.yarnClient.getConfig
-
-    assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.http.impl"))
-
-    assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.https.impl"))
-
-  }
-}
-