You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/27 17:34:18 UTC
samza git commit: SAMZA-1143;
Universal config support for localized resource
Repository: samza
Updated Branches:
refs/heads/master 9d52e996b -> e4cfeebeb
SAMZA-1143; Universal config support for localized resource
More details in https://issues.apache.org/jira/browse/SAMZA-1143
Tests: ./gradlew clean check successful and all unit tests passed
Author: Fred Ji <fj...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #90 from fredji97/universalLocalizer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4cfeebe
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4cfeebe
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4cfeebe
Branch: refs/heads/master
Commit: e4cfeebebf3ec94ca3b5aca2e4aed10e77db38ae
Parents: 9d52e99
Author: Fred Ji <fj...@linkedin.com>
Authored: Mon Mar 27 10:34:12 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Mar 27 10:34:12 2017 -0700
----------------------------------------------------------------------
.../samza/job/yarn/FileSystemImplConfig.java | 86 +++++++++
.../samza/job/yarn/LocalizerResourceConfig.java | 102 +++++++++++
.../job/yarn/LocalizerResourceException.java | 72 ++++++++
.../samza/job/yarn/LocalizerResourceMapper.java | 101 +++++++++++
.../job/yarn/YarnClusterResourceManager.java | 8 +-
.../samza/job/yarn/YarnContainerRunner.java | 26 ++-
.../job/yarn/YarnResourceManagerFactory.java | 1 -
.../apache/samza/job/yarn/ClientHelper.scala | 21 ++-
.../apache/samza/job/yarn/YarnJobFactory.scala | 11 +-
.../job/yarn/TestFileSystemImplConfig.java | 75 ++++++++
.../job/yarn/TestLocalizerResourceConfig.java | 125 +++++++++++++
.../job/yarn/TestLocalizerResourceMapper.java | 174 +++++++++++++++++++
.../samza/job/yarn/TestYarnJobFactory.java | 48 +++++
.../samza/job/yarn/TestYarnJobFactory.scala | 45 -----
14 files changed, 836 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
new file mode 100644
index 0000000..8e79104
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * FileSystemImplConfig is intended to manage the Samza config for fs.<scheme>impl.
+ * e.g. fs.http.impl
+ */
+public class FileSystemImplConfig {
+ private static final Logger log = LoggerFactory.getLogger(FileSystemImplConfig.class);
+ private static final String FS_IMPL_PREFIX = "fs.";
+ private static final String FS_IMPL_SUFFIX = ".impl";
+ private static final String FS_IMPL = "fs.%s.impl";
+
+ private final Config config;
+
+ public FileSystemImplConfig(final Config config) {
+ if (null == config) {
+ throw new IllegalArgumentException("config cannot be null");
+ }
+ this.config = config;
+ }
+
+ /**
+ * Get all schemes
+ * @return List of schemes in strings
+ */
+ public List<String> getSchemes() {
+ Config subConfig = config.subset(FS_IMPL_PREFIX, true);
+ List<String> schemes = new ArrayList<String>();
+ for (String key : subConfig.keySet()) {
+ if (key.endsWith(FS_IMPL_SUFFIX)) {
+ schemes.add(key.substring(0, key.length() - FS_IMPL_SUFFIX.length()));
+ }
+ }
+ return schemes;
+ }
+
+ /**
+ * Get the fs.<scheme>impl as the config key from scheme
+ * @param scheme scheme name, such as http, hdfs, myscheme
+ * @return fs.<scheme>impl
+ */
+ public String getFsImplKey(final String scheme) {
+ String fsImplKey = String.format(FS_IMPL, scheme);
+ return fsImplKey;
+ }
+
+ /**
+ * Get the class name corresponding for the given scheme
+ * @param scheme scheme name, such as http, hdfs, myscheme
+ * @return full scoped class name for the file system for <scheme>
+ */
+ public String getFsImplClassName(final String scheme) {
+ String fsImplKey = getFsImplKey(scheme);
+ String fsImplClassName = config.get(fsImplKey);
+ if (StringUtils.isEmpty(fsImplClassName)) {
+ throw new LocalizerResourceException(fsImplKey + " does not have configured class implementation");
+ }
+ return fsImplClassName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
new file mode 100644
index 0000000..ca94783
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.samza.config.Config;
+
+/**
+ * LocalizerResourceConfig is intended to manage/fetch the config values
+ * for the yarn localizer resource(s) from the configuration.
+ *
+ * There are 4 config values
+ * yarn.resources.<resourceName>.path
+ * (Required) The path for fetching the resource for localization,
+ * e.g. http://hostname.com/test.
+ * yarn.resources.<resourceName>.local.name
+ * (Optional) The local name used for the localized resource.
+ * If not set, the default one will be the <resourceName> from the config key.
+ * yarn.resources.<resourceName>.local.type
+ * (Optional) The value value is a string format of {@link LocalResourceType}:
+ * ARCHIVE, FILE, PATTERN.
+ * If not set, the default value is FILE.
+ * yarn.resources.<resourceName>.local.visibility
+ * (Optional) The valid value is a string format of {@link LocalResourceVisibility}:
+ * PUBLIC, PRIVATE, or APPLICATION.
+ * If not set, the default value is is APPLICATION.
+ */
+public class LocalizerResourceConfig {
+ private static final String RESOURCE_PREFIX = "yarn.resources.";
+ private static final String PATH_SUFFIX = ".path";
+ private static final String RESOURCE_PATH = "yarn.resources.%s.path";
+ private static final String RESOURCE_LOCAL_NAME = "yarn.resources.%s.local.name";
+ private static final String RESOURCE_LOCAL_TYPE = "yarn.resources.%s.local.type";
+ private static final String RESOURCE_LOCAL_VISIBILITY = "yarn.resources.%s.local.visibility";
+ private static final String DEFAULT_RESOURCE_LOCAL_TYPE = "FILE";
+ private static final String DEFAULT_RESOURCE_LOCAL_VISIBILITY = "APPLICATION";
+
+ private final Config config;
+
+ public LocalizerResourceConfig(final Config config) {
+ if (null == config) {
+ throw new IllegalArgumentException("config cannot be null");
+ }
+ this.config = config;
+ }
+
+ public List<String> getResourceNames() {
+ Config subConfig = config.subset(RESOURCE_PREFIX, true);
+ List<String> resourceNames = new ArrayList<String>();
+ for (String key : subConfig.keySet()) {
+ if (key.endsWith(PATH_SUFFIX)) {
+ resourceNames.add(key.substring(0, key.length() - PATH_SUFFIX.length()));
+ }
+ }
+ return resourceNames;
+ }
+
+ public Path getResourcePath(final String resourceName) {
+ String pathStr = config.get(String.format(RESOURCE_PATH, resourceName));
+ if (StringUtils.isEmpty(pathStr)) {
+ throw new LocalizerResourceException("resource path is required but not defined in config for resource " + resourceName);
+ }
+ return new Path(pathStr);
+ }
+
+ public LocalResourceType getResourceLocalType(final String resourceName) {
+ String typeStr = config.get(String.format(RESOURCE_LOCAL_TYPE, resourceName), DEFAULT_RESOURCE_LOCAL_TYPE);
+ return LocalResourceType.valueOf(StringUtils.upperCase(typeStr));
+ }
+
+ public LocalResourceVisibility getResourceLocalVisibility(final String resourceName) {
+ String visibilityStr = config.get(String.format(RESOURCE_LOCAL_VISIBILITY, resourceName), DEFAULT_RESOURCE_LOCAL_VISIBILITY);
+ return LocalResourceVisibility.valueOf(StringUtils.upperCase(visibilityStr));
+ }
+
+ public String getResourceLocalName(final String resourceName) {
+ String name = config.get(String.format(RESOURCE_LOCAL_NAME, resourceName), resourceName);
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
new file mode 100644
index 0000000..0df6903
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+public class LocalizerResourceException extends RuntimeException {
+
+ /**
+ * Constructs an {@code LocalizerResourceException} with {@code null}
+ * as its error detail message.
+ */
+ public LocalizerResourceException() {
+ super();
+ }
+
+ /**
+ * Constructs an {@code LocalizerResourceException} with the specified detail message.
+ *
+ * @param message
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ */
+ public LocalizerResourceException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an {@code LocalizerResourceException} with the specified detail message
+ * and cause.
+ *
+ * @param message
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ *
+ * @param cause
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public LocalizerResourceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs an {@code LocalizerResourceException} with the specified cause and a
+ * detail message of {@code (cause==null ? null : cause.toString())}
+ *
+ * @param cause
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public LocalizerResourceException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
new file mode 100644
index 0000000..6dddb0a
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A universal approach to generate local resource map which can be put in ContainerLaunchContext directly
+ */
+public class LocalizerResourceMapper {
+ private static final Logger log = LoggerFactory.getLogger(LocalizerResourceMapper.class);
+
+ private final YarnConfiguration yarnConfiguration; //yarn configurations
+ private final LocalizerResourceConfig resourceConfig;
+ private final Map<String, LocalResource> localResourceMap;
+
+ public LocalizerResourceMapper(LocalizerResourceConfig resourceConfig, YarnConfiguration yarnConfiguration) {
+ this.yarnConfiguration = yarnConfiguration;
+ this.resourceConfig = resourceConfig;
+ this.localResourceMap = buildResourceMapping();
+ }
+
+ private Map<String, LocalResource> buildResourceMapping() {
+ ImmutableMap.Builder<String, LocalResource> localResourceMapBuilder = ImmutableMap.builder();
+
+ List<String> resourceNames = resourceConfig.getResourceNames();
+ for (String resourceName : resourceNames) {
+ String resourceLocalName = resourceConfig.getResourceLocalName(resourceName);
+ LocalResourceType resourceType = resourceConfig.getResourceLocalType(resourceName);
+ LocalResourceVisibility resourceVisibility = resourceConfig.getResourceLocalVisibility(resourceName);
+ Path resourcePath = resourceConfig.getResourcePath(resourceName);
+
+ LocalResource localResource = createLocalResource(resourcePath, resourceType, resourceVisibility);
+
+ localResourceMapBuilder.put(resourceLocalName, localResource);
+ log.info("preparing local resource: {}", resourceLocalName);
+ }
+
+ return localResourceMapBuilder.build();
+ }
+
+ private LocalResource createLocalResource(Path resourcePath, LocalResourceType resourceType, LocalResourceVisibility resourceVisibility) {
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ URL resourceUrl = ConverterUtils.getYarnUrlFromPath(resourcePath);
+ try {
+ FileStatus resourceFileStatus = resourcePath.getFileSystem(yarnConfiguration).getFileStatus(resourcePath);
+
+ if (null == resourceFileStatus) {
+ throw new LocalizerResourceException("Check getFileStatus implementation. getFileStatus gets unexpected null for resourcePath " + resourcePath);
+ }
+
+ localResource.setResource(resourceUrl);
+ log.info("setLocalizerResource for {}", resourceUrl);
+ localResource.setSize(resourceFileStatus.getLen());
+ localResource.setTimestamp(resourceFileStatus.getModificationTime());
+ localResource.setType(resourceType);
+ localResource.setVisibility(resourceVisibility);
+ return localResource;
+ } catch (IOException ioe) {
+ log.error("IO Exception when accessing the resource file status from the filesystem: " + resourcePath, ioe);
+ throw new LocalizerResourceException("IO Exception when accessing the resource file status from the filesystem: " + resourcePath);
+ }
+
+ }
+
+ public Map<String, LocalResource> getResourceMap() {
+ return ImmutableMap.copyOf(localResourceMap);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index c1b71bb..04c78be 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -36,12 +36,10 @@ import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.job.yarn.YarnContainer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -120,6 +118,12 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
hConfig = new YarnConfiguration();
hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+ // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
+ FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
+ fsImplConfig.getSchemes().forEach(
+ scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
+ );
+
MetricsRegistryMap registry = new MetricsRegistryMap();
metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index 181102b..c45fc7f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -43,10 +43,12 @@ import org.apache.samza.job.CommandBuilder;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
/**
* A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer
@@ -112,11 +114,11 @@ public class YarnContainerRunner {
log.info("Samza FWK path: " + command + "; env=" + env);
- Path path = new Path(yarnConfig.getPackagePath());
- log.info("Starting container ID {} using package path {}", samzaContainerId, path);
+ Path packagePath = new Path(yarnConfig.getPackagePath());
+ log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
startContainer(
- path,
+ packagePath,
container,
env,
getFormattedCommand(
@@ -151,6 +153,8 @@ public class YarnContainerRunner {
log.info("starting container {} {} {} {}",
new Object[]{packagePath, container, env, cmd});
+ // TODO: SAMZA-1144 remove the customized approach for package resource and use the common one.
+ // But keep it now for backward compatibility.
// set the local package so that the containers and app master are provisioned with it
LocalResource packageResource = Records.newRecord(LocalResource.class);
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
@@ -163,6 +167,7 @@ public class YarnContainerRunner {
}
packageResource.setResource(packageUrl);
+ log.info("set package Resource in YarnContainerRunner for {}", packageUrl);
packageResource.setSize(fileStatus.getLen());
packageResource.setTimestamp(fileStatus.getModificationTime());
packageResource.setType(LocalResourceType.ARCHIVE);
@@ -190,13 +195,20 @@ public class YarnContainerRunner {
throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
}
+ Map<String, LocalResource> localResourceMap = new HashMap<>();
+ localResourceMap.put("__package", packageResource);
+
+ // include the resources from the universal resource configurations
+ LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
+ localResourceMap.putAll(resourceMapper.getResourceMap());
+
ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
context.setEnvironment(env);
context.setTokens(allTokens.duplicate());
context.setCommands(new ArrayList<String>() {{add(cmd);}});
- context.setLocalResources(Collections.singletonMap("__package", packageResource));
+ context.setLocalResources(localResourceMap);
- log.debug("setting package to {}", packageResource);
+ log.debug("setting localResourceMap to {}", localResourceMap);
log.debug("setting context to {}", context);
StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
index 988a8e8..3f9a84d 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.samza.job.yarn;
import org.apache.samza.clustermanager.ClusterResourceManager;
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 6c63b63..e5aafbb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -20,12 +20,12 @@
package org.apache.samza.job.yarn
import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.samza.config.{JobConfig, Config, YarnConfig}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamWriter}
+import org.apache.samza.config.{Config, JobConfig, YarnConfig}
+import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
import org.apache.samza.coordinator.stream.messages.SetConfig
import scala.collection.JavaConversions._
-import scala.collection.{Map}
+import scala.collection.Map
import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -140,6 +140,8 @@ class ClientHelper(conf: Configuration) extends Logging {
case None =>
}
+ // TODO: remove the customized approach for package resource and use the common one.
+ // But keep it now for backward compatibility.
// set the local package so that the containers and app master are provisioned with it
val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
val fs = packagePath.getFileSystem(conf)
@@ -167,6 +169,17 @@ class ClientHelper(conf: Configuration) extends Logging {
val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]()
localResources += "__package" -> packageResource
+
+ // include the resources from the universal resource configurations
+ try {
+ val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf))
+ localResources ++= resourceMapper.getResourceMap
+ } catch {
+ case e: LocalizerResourceException => {
+ throw new SamzaException("Exception during resource mapping from config. ", e)
+ }
+ }
+
if (UserGroupInformation.isSecurityEnabled()) {
validateJobConfig(config)
@@ -187,6 +200,8 @@ class ClientHelper(conf: Configuration) extends Logging {
coordinatorStreamWriter.stop()
}
+ // prepare all local resources for localizer
+ info("localResources is: %s" format localResources)
containerCtx.setLocalResources(localResources)
info("set local resources on application master for %s" format appId.get)
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index 625d3bb..f057594 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -25,8 +25,10 @@ import org.apache.samza.job.StreamJobFactory
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.samza.config.Config
import org.apache.samza.util.hadoop.HttpFileSystem
+import org.apache.samza.util.Logging
+import scala.collection.JavaConversions._
-class YarnJobFactory extends StreamJobFactory {
+class YarnJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config) = {
// TODO fix this. needed to support http package locations.
val hConfig = new YarnConfiguration
@@ -37,6 +39,13 @@ class YarnJobFactory extends StreamJobFactory {
if (config.containsKey(YarnConfiguration.RM_ADDRESS)) {
hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032"))
}
+
+ // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
+ val fsImplConfig = new FileSystemImplConfig(config)
+ fsImplConfig.getSchemes.foreach(
+ (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
+ )
+
new YarnJob(config, hConfig)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
new file mode 100644
index 0000000..6e11c66
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFileSystemImplConfig {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testFileSystemImplConfigSuccess() {
+ Map<String, String> configMap = new HashMap<>();
+
+ configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem");
+ configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem");
+
+ Config conf = new MapConfig(configMap);
+
+ FileSystemImplConfig manager = new FileSystemImplConfig(conf);
+ assertEquals(2, manager.getSchemes().size());
+ assertEquals("http", manager.getSchemes().get(0));
+ assertEquals("myscheme", manager.getSchemes().get(1));
+
+ assertEquals("fs.http.impl", manager.getFsImplKey("http"));
+ assertEquals("fs.myscheme.impl", manager.getFsImplKey("myscheme"));
+
+ assertEquals("org.apache.samza.HttpFileSystem", manager.getFsImplClassName("http"));
+ assertEquals("org.apache.samza.MySchemeFileSystem", manager.getFsImplClassName("myscheme"));
+ }
+
+ @Test
+ public void testNullConfig() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("config cannot be null");
+ FileSystemImplConfig manager = new FileSystemImplConfig(null);
+ }
+
+ @Test
+ public void testEmptyImpl() {
+ thrown.expect(LocalizerResourceException.class);
+ thrown.expectMessage("fs.http.impl does not have configured class implementation");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("fs.http.impl", "");
+ Config conf = new MapConfig(configMap);
+
+ FileSystemImplConfig manager = new FileSystemImplConfig(conf);
+ manager.getFsImplClassName("http");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
new file mode 100644
index 0000000..e003125
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestLocalizerResourceConfig {
+
+ @Rule
+ public ExpectedException thrown= ExpectedException.none();
+
+ @Test
+ public void testResourceConfigIncluded() {
+ Map<String, String> configMap = new HashMap<>();
+
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "file");
+ configMap.put("yarn.resources.myResource1.local.visibility", "public");
+
+ Config conf = new MapConfig(configMap);
+
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+ assertEquals(1, manager.getResourceNames().size());
+ assertEquals("myResource1", manager.getResourceNames().get(0));
+ assertEquals("readme", manager.getResourceLocalName("myResource1"));
+ assertEquals(LocalResourceType.FILE, manager.getResourceLocalType("myResource1"));
+ assertEquals(LocalResourceVisibility.PUBLIC, manager.getResourceLocalVisibility("myResource1"));
+ }
+
+ @Test
+ public void testResourcrConfigNotIncluded() {
+ Map<String, String> configMap = new HashMap<>();
+
+ configMap.put("otherconfig", "https://host2.com/not_included");
+ configMap.put("yarn.resources.myResource2.local.name", "notExisting");
+ configMap.put("yarn.resources.myResource2.local.type", "file");
+ configMap.put("yarn.resources.myResource2.local.visibility", "application");
+
+ Config conf = new MapConfig(configMap);
+
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+ assertEquals(0, manager.getResourceNames().size());
+ }
+
+ @Test
+ public void testNullConfig() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("config cannot be null");
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(null);
+ }
+
+ @Test
+ public void testInvalidVisibility() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "file");
+ configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility");
+ Config conf = new MapConfig(configMap);
+
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+ manager.getResourceLocalVisibility("myResource1");
+ }
+
+ @Test
+ public void testInvalidType() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+ configMap.put("yarn.resources.myResource1.local.visibility", "application");
+ Config conf = new MapConfig(configMap);
+
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+ manager.getResourceLocalType("myResource1");
+ }
+
+ @Test
+ public void testInvalidPath() {
+ thrown.expect(LocalizerResourceException.class);
+ thrown.expectMessage("resource path is required but not defined in config for resource myResource1");
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+ configMap.put("yarn.resources.myResource1.local.visibility", "application");
+ Config conf = new MapConfig(configMap);
+
+ LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
+ manager.getResourcePath("myResource1");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
new file mode 100644
index 0000000..d065019
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestLocalizerResourceMapper {
+
+ @Rule
+ public ExpectedException thrown= ExpectedException.none();
+
+ @Test
+ public void testResourceMapSuccess() {
+
+ Map<String, String> configMap = new HashMap<>();
+
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "file");
+ configMap.put("yarn.resources.myResource1.local.visibility", "public");
+
+ configMap.put("yarn.resources.myResource2.path", "https://host2.com/package");
+ configMap.put("yarn.resources.myResource2.local.name", "__package");
+ configMap.put("yarn.resources.myResource2.local.type", "archive");
+ configMap.put("yarn.resources.myResource2.local.visibility", "private");
+
+ configMap.put("yarn.resources.myResource3.path", "https://host3.com/csr");
+ configMap.put("yarn.resources.myResource3.local.name", "csr");
+ configMap.put("yarn.resources.myResource3.local.type", "file");
+ configMap.put("yarn.resources.myResource3.local.visibility", "application");
+
+ configMap.put("otherconfig", "https://host4.com/not_included");
+ configMap.put("yarn.resources.myResource4.local.name", "notExisting");
+ configMap.put("yarn.resources.myResource4.local.type", "file");
+ configMap.put("yarn.resources.myResource4.local.visibility", "application");
+
+ Config conf = new MapConfig(configMap);
+
+ YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+ yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+
+ LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+ Map<String, LocalResource> resourceMap = mapper.getResourceMap();
+
+ assertEquals("resourceMap has 3 resources", 3, resourceMap.size());
+
+ // resource1
+ assertEquals("host1.com", resourceMap.get("readme").getResource().getHost());
+ assertEquals(LocalResourceType.FILE, resourceMap.get("readme").getType());
+ assertEquals(LocalResourceVisibility.PUBLIC, resourceMap.get("readme").getVisibility());
+
+ // resource 2
+ assertEquals("host2.com", resourceMap.get("__package").getResource().getHost());
+ assertEquals(LocalResourceType.ARCHIVE, resourceMap.get("__package").getType());
+ assertEquals(LocalResourceVisibility.PRIVATE, resourceMap.get("__package").getVisibility());
+
+ // resource 3
+ assertEquals("host3.com", resourceMap.get("csr").getResource().getHost());
+ assertEquals(LocalResourceType.FILE, resourceMap.get("csr").getType());
+ assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("csr").getVisibility());
+
+ // resource 4 should not exist
+ assertNull("Resource does not exist with the name myResource4", resourceMap.get("myResource4"));
+ assertNull("Resource does not exist with the defined config name notExisting for myResource4 either", resourceMap.get("notExisting"));
+ }
+
+ @Test
+ public void testResourceMapWithDefaultValues() {
+
+ Map<String, String> configMap = new HashMap<>();
+
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+
+ Config conf = new MapConfig(configMap);
+
+ YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+
+ LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+ Map<String, LocalResource> resourceMap = mapper.getResourceMap();
+
+ assertNull("Resource does not exist with a name readme", resourceMap.get("readme"));
+ assertNotNull("Resource exists with a name myResource1", resourceMap.get("myResource1"));
+ assertEquals("host1.com", resourceMap.get("myResource1").getResource().getHost());
+ assertEquals(LocalResourceType.FILE, resourceMap.get("myResource1").getType());
+ assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("myResource1").getVisibility());
+ }
+
+ @Test
+ public void testResourceMapWithFileStatusFailure() {
+ thrown.expect(LocalizerResourceException.class);
+ thrown.expectMessage("IO Exception when accessing the resource file status from the filesystem");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "unknown://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "file");
+ configMap.put("yarn.resources.myResource1.local.visibility", "public");
+ Config conf = new MapConfig(configMap);
+
+ YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+ yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+ LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+ }
+
+ @Test
+ public void testResourceMapWithInvalidVisibilityFailure() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "file");
+ configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility");
+ Config conf = new MapConfig(configMap);
+
+ YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+ yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+ LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+ }
+
+ @Test
+ public void testResourceMapWithInvalidTypeFailure() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE");
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
+ configMap.put("yarn.resources.myResource1.local.name", "readme");
+ configMap.put("yarn.resources.myResource1.local.type", "invalidType");
+ configMap.put("yarn.resources.myResource1.local.visibility", "public");
+ Config conf = new MapConfig(configMap);
+
+ YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
+ yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
+ LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
new file mode 100644
index 0000000..11077f0
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestYarnJobFactory {
+ @Test
+ public void testGetJobWithDefaultFsImpl() {
+ YarnJobFactory jobFactory = new YarnJobFactory();
+ YarnJob yarnJob = jobFactory.getJob(new MapConfig());
+ Configuration hConfig = yarnJob.client().yarnClient().getConfig();
+ assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.http.impl"));
+ assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.https.impl"));
+ }
+
+ @Test
+ public void testGetJobWithFsImplOverride() {
+ YarnJobFactory jobFactory = new YarnJobFactory();
+ YarnJob yarnJob = jobFactory.getJob(new MapConfig(ImmutableMap.of(
+ "fs.http.impl", "org.apache.myHttp",
+ "fs.myscheme.impl","org.apache.myScheme")));
+ Configuration hConfig = yarnJob.client().yarnClient().getConfig();
+ assertEquals("org.apache.myHttp", hConfig.get("fs.http.impl"));
+ assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
deleted file mode 100644
index 110c0fc..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.samza.config.MapConfig
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.junit.Assert._
-import org.junit.Test
-
-
-class TestYarnJobFactory {
-
- @Test
- def testGetJob {
-
- val jobFactory = new YarnJobFactory
-
- val yarnJob = jobFactory.getJob(new MapConfig)
-
- val hConfig = yarnJob.client.yarnClient.getConfig
-
- assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.http.impl"))
-
- assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.https.impl"))
-
- }
-}
-