You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:11:05 UTC
[65/76] [abbrv] hadoop git commit: YARN-5505. Create an agent-less
docker provider in the native-services framework. Contributed by Billie
Rinaldi
YARN-5505. Create an agent-less docker provider in the native-services framework. Contributed by Billie Rinaldi
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb61fe3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb61fe3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb61fe3f
Branch: refs/heads/yarn-native-services
Commit: cb61fe3fa6784cc72f10f90456987bbaa55c8914
Parents: 86a29d4
Author: Jian He <ji...@apache.org>
Authored: Thu Sep 1 22:38:42 2016 +0800
Committer: Jian He <ji...@apache.org>
Committed: Wed Dec 7 13:00:06 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/slider/api/OptionKeys.java | 15 +-
.../org/apache/slider/client/SliderClient.java | 17 +-
.../org/apache/slider/common/SliderKeys.java | 22 +-
.../apache/slider/common/tools/SliderUtils.java | 4 +
.../slider/core/launch/AbstractLauncher.java | 18 +-
.../PublishedConfigurationOutputter.java | 6 +-
.../providers/AbstractClientProvider.java | 4 +-
.../providers/AbstractProviderService.java | 22 +-
.../slider/providers/ProviderService.java | 12 +-
.../apache/slider/providers/ProviderUtils.java | 1391 ++++++++++++++----
.../providers/agent/AgentClientProvider.java | 36 +-
.../slider/providers/agent/AgentKeys.java | 12 +-
.../providers/agent/AgentProviderService.java | 705 ++-------
.../providers/docker/DockerClientProvider.java | 96 ++
.../slider/providers/docker/DockerKeys.java | 32 +
.../providers/docker/DockerProviderFactory.java | 43 +
.../providers/docker/DockerProviderService.java | 355 +++++
.../slideram/SliderAMProviderService.java | 4 -
.../server/appmaster/SliderAppMaster.java | 39 +-
.../main/resources/org/apache/slider/slider.xml | 4 +
.../slider/providers/docker/appConfig.json | 42 +
.../slider/providers/docker/resources.json | 16 +
.../slider/providers/docker/test.template | 16 +
23 files changed, 1971 insertions(+), 940 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
index a035a99..434b1d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
@@ -41,7 +41,20 @@ public interface OptionKeys extends InternalKeys {
* Prefix for site.xml options: {@value}
*/
String SITE_XML_PREFIX = "site.";
-
+ /**
+ * Prefix for config file options: {@value}
+ */
+ String CONF_FILE_PREFIX = "conf.";
+ /**
+ * Prefix for package options: {@value}
+ */
+ String PKG_FILE_PREFIX = "pkg.";
+ /**
+ * Prefix for export options: {@value}
+ */
+ String EXPORT_PREFIX = "export.";
+ String TYPE_SUFFIX = ".type";
+ String NAME_SUFFIX = ".name";
/**
* Zookeeper quorum host list: {@value}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 3129f6f..5096bb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -151,7 +151,6 @@ import org.apache.slider.core.registry.YarnAppListClient;
import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
import org.apache.slider.core.registry.docstore.PublishedExportsSet;
@@ -162,6 +161,7 @@ import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.docker.DockerClientProvider;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
@@ -2081,7 +2081,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
// add the tags if available
Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
- getApplicationDefinitionPath(appOperations));
+ appOperations);
Credentials credentials = null;
if (clusterSecure) {
@@ -2242,11 +2242,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
);
- // TODO: consider supporting apps that don't have an image path
- Path imagePath =
- extractImagePath(sliderFileSystem, internalOptions);
- if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
- log.debug("Registered image path {}", imagePath);
+ if (!(provider instanceof DockerClientProvider)) {
+ Path imagePath =
+ extractImagePath(sliderFileSystem, internalOptions);
+ if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
+ log.debug("Registered image path {}", imagePath);
+ }
}
// build the environment
@@ -3814,7 +3815,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
Path subPath = new Path(path1, appReport.getApplicationId()
.toString() + "/agent");
imagePath = subPath.toString();
- String pathStr = imagePath + "/" + AGENT_TAR;
+ String pathStr = imagePath + "/" + AgentKeys.AGENT_TAR;
try {
validateHDFSFile(sliderFileSystem, pathStr);
log.info("Slider agent package is properly installed at " + pathStr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index 120b1fc..1484ee3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -81,6 +81,10 @@ public interface SliderKeys extends SliderXmlConfKeys {
String COMPONENT_SEPARATOR = "-";
String[] COMPONENT_KEYS_TO_SKIP = {"zookeeper.", "env.MALLOC_ARENA_MAX",
"site.fs.", "site.dfs."};
+ /**
+ * A component type for a client component
+ */
+ String COMPONENT_TYPE_CLIENT = "client";
/**
* Key for application version. This must be set in app_config/global {@value}
@@ -222,7 +226,6 @@ public interface SliderKeys extends SliderXmlConfKeys {
String SLIDER_JAR = "slider.jar";
String JCOMMANDER_JAR = "jcommander.jar";
String GSON_JAR = "gson.jar";
- String AGENT_TAR = "slider-agent.tar.gz";
String DEFAULT_APP_PKG = "appPkg.zip";
String DEFAULT_JVM_HEAP = "256M";
@@ -288,4 +291,21 @@ public interface SliderKeys extends SliderXmlConfKeys {
String SLIDER_CLASSPATH_EXTRA = "SLIDER_CLASSPATH_EXTRA";
String YARN_CONTAINER_PATH = "/node/container/";
+
+ String GLOBAL_CONFIG_TAG = "global";
+ String SYSTEM_CONFIGS = "system_configs";
+ String JAVA_HOME = "java_home";
+ String TWO_WAY_SSL_ENABLED = "ssl.server.client.auth";
+ String INFRA_RUN_SECURITY_DIR = "infra/run/security/";
+ String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt";
+
+ String AM_CONFIG_GENERATION = "am.config.generation";
+ String APP_CONF_DIR = "app/conf";
+
+ String APP_RESOURCES = "application.resources";
+ String APP_RESOURCES_DIR = "app/resources";
+ String PER_COMPONENT = "per.component";
+ String PER_GROUP = "per.group";
+
+ String APP_PACKAGES_DIR = "app/packages";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
index e9f65ba..f773982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
@@ -183,6 +183,10 @@ public final class SliderUtils {
return !isUnset(s);
}
+ public static boolean isEmpty(List l) {
+ return l == null || l.isEmpty();
+ }
+
/**
* Probe for a list existing and not being empty
* @param l list
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 5a3eb3d..aefc0de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -52,6 +52,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.slider.providers.docker.DockerKeys.DEFAULT_DOCKER_NETWORK;
+
/**
* Launcher of applications: base class
*/
@@ -79,6 +81,7 @@ public abstract class AbstractLauncher extends Configured {
protected LogAggregationContext logAggregationContext;
protected boolean yarnDockerMode = false;
protected String dockerImage;
+ protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
protected String yarnContainerMountPoints;
protected String runPrivilegedContainer;
@@ -232,7 +235,8 @@ public abstract class AbstractLauncher extends Configured {
if(yarnDockerMode){
Map<String, String> env = containerLaunchContext.getEnvironment();
env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker");
- env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);//if yarnDockerMode, then dockerImage is set
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork);
env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer);
StringBuilder sb = new StringBuilder();
for (Entry<String,String> mount : mountPaths.entrySet()) {
@@ -517,6 +521,10 @@ public abstract class AbstractLauncher extends Configured {
this.dockerImage = dockerImage;
}
+ public void setDockerNetwork(String dockerNetwork) {
+ this.dockerNetwork = dockerNetwork;
+ }
+
public void setYarnContainerMountPoints(String yarnContainerMountPoints) {
this.yarnContainerMountPoints = yarnContainerMountPoints;
}
@@ -525,4 +533,12 @@ public abstract class AbstractLauncher extends Configured {
this.runPrivilegedContainer = runPrivilegedContainer;
}
+ public void setRunPrivilegedContainer(boolean runPrivilegedContainer) {
+ if (runPrivilegedContainer) {
+ this.runPrivilegedContainer = Boolean.toString(true);
+ } else {
+ this.runPrivilegedContainer = Boolean.toString(false);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
index 9bdcfcb..4ec513c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
@@ -39,6 +39,8 @@ import java.util.Properties;
*/
public abstract class PublishedConfigurationOutputter {
+ private static final String COMMENTS = "Generated by Apache Slider";
+
protected final PublishedConfiguration owner;
protected PublishedConfigurationOutputter(PublishedConfiguration owner) {
@@ -143,13 +145,13 @@ public abstract class PublishedConfigurationOutputter {
@Override
public void save(OutputStream out) throws IOException {
- properties.store(out, "");
+ properties.store(out, COMMENTS);
}
public String asString() throws IOException {
StringWriter sw = new StringWriter();
- properties.store(sw, "");
+ properties.store(sw, COMMENTS);
return sw.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
index 510de5d..f59c347 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
@@ -216,8 +216,8 @@ public abstract class AbstractClientProvider extends Configured {
* Return a set of application specific string tags.
* @return the set of tags.
*/
- public Set<String> getApplicationTags (SliderFileSystem fileSystem,
- String appDef) throws SliderException {
+ public Set<String> getApplicationTags(SliderFileSystem fileSystem,
+ ConfTreeOperations appConf) throws SliderException {
return Collections.emptySet();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index 92766f5..19fa07b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -139,6 +140,19 @@ public abstract class AbstractProviderService
}
/**
+ * Load default Configuration
+ * @param confDir configuration directory
+ * @return configuration
+ * @throws BadCommandArgumentsException
+ * @throws IOException
+ */
+ @Override
+ public Configuration loadProviderConfigurationInformation(File confDir)
+ throws BadCommandArgumentsException, IOException {
+ return new Configuration(false);
+ }
+
+ /**
* Load a specific XML configuration file for the provider config
* @param confDir configuration directory
* @param siteXMLFilename provider-specific filename
@@ -369,8 +383,6 @@ public abstract class AbstractProviderService
@Override
public void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException {
this.amWebAPI = amWebURI;
@@ -422,4 +434,10 @@ public abstract class AbstractProviderService
public void rebuildContainerDetails(List<Container> liveContainers,
String applicationId, Map<Integer, ProviderRole> providerRoles) {
}
+
+ @Override
+ public boolean processContainerStatus(ContainerId containerId,
+ ContainerStatus status) {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index 3f24665..b62510a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.conf.AggregateConf;
@@ -189,13 +190,9 @@ public interface ProviderService extends ProviderCore,
/**
* Prior to going live -register the initial service registry data
* @param amWebURI URL to the AM. This may be proxied, so use relative paths
- * @param agentOpsURI URI for agent operations. This will not be proxied
- * @param agentStatusURI URI For agent status. Again: no proxy
* @param serviceRecord service record to build up
*/
void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException;
@@ -216,4 +213,11 @@ public interface ProviderService extends ProviderCore,
*/
void rebuildContainerDetails(List<Container> liveContainers,
String applicationId, Map<Integer, ProviderRole> providerRoles);
+
+ /**
+ * Process container status
+ * @return true if status needs to be requested again, false otherwise
+ */
+ boolean processContainerStatus(ContainerId containerId,
+ ContainerStatus status);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb61fe3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
index 07d106b..47556f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
@@ -18,16 +18,29 @@
package org.apache.slider.providers;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.slider.api.ClusterDescription;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
+import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
@@ -35,28 +48,50 @@ import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.ConfigUtils;
+import org.apache.slider.core.registry.docstore.ExportEntry;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.services.security.CertificateManager;
+import org.apache.slider.server.services.security.SecurityStore;
+import org.apache.slider.server.services.security.StoresGenerator;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.LinkedList;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.regex.Pattern;
/**
- * this is a factoring out of methods handy for providers. It's bonded to a log at
- * construction time
+ * This is a factoring out of methods handy for providers. It's bonded to a log
+ * at construction time.
*/
-public class ProviderUtils implements RoleKeys {
+public class ProviderUtils implements RoleKeys, SliderKeys {
protected final Logger log;
/**
- * Create an instace
+ * Create an instance
* @param log log directory to use -usually the provider
*/
@@ -66,14 +101,14 @@ public class ProviderUtils implements RoleKeys {
/**
* Add oneself to the classpath. This does not work
- * on minicluster test runs where the JAR is not built up
+ * on minicluster test runs where the JAR is not built up.
* @param providerResources map of provider resources to add these entries to
* @param provider provider to add
* @param jarName name of the jar to use
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libdir relative directory to place resources
- * @param miniClusterTestRun
+ * @param miniClusterTestRun true if minicluster is being used
* @return true if the class was found in a JAR
*
* @throws FileNotFoundException if the JAR was not found and this is NOT
@@ -81,7 +116,8 @@ public class ProviderUtils implements RoleKeys {
* @throws IOException IO problems
* @throws SliderException any Slider problem
*/
- public static boolean addProviderJar(Map<String, LocalResource> providerResources,
+ public static boolean addProviderJar(
+ Map<String, LocalResource> providerResources,
Object provider,
String jarName,
SliderFileSystem sliderFileSystem,
@@ -108,13 +144,14 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Add/overwrite the agent tarball (overwritten every time application is restarted)
- * @param provider
- * @param tarName
- * @param sliderFileSystem
- * @param agentDir
+ * Add/overwrite the agent tarball (overwritten every time application is
+ * restarted).
+ * @param provider an instance of a provider class
+ * @param tarName name of the tarball to upload
+ * @param sliderFileSystem the file system
+ * @param agentDir directory to upload to
* @return true the location could be determined and the file added
- * @throws IOException
+ * @throws IOException if the upload fails
*/
public static boolean addAgentTar(Object provider,
String tarName,
@@ -125,100 +162,58 @@ public class ProviderUtils implements RoleKeys {
if(localFile != null) {
String parentDir = localFile.getParent();
Path agentTarPath = new Path(parentDir, tarName);
- sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, agentTarPath, agentDir);
+ sliderFileSystem.getFileSystem().copyFromLocalFile(false, true,
+ agentTarPath, agentDir);
return true;
}
return false;
}
/**
- * Add a set of dependencies to the provider resources being built up,
- * by copying them from the local classpath to the remote one, then
- * registering them
- * @param providerResources map of provider resources to add these entries to
- * @param sliderFileSystem target filesystem
- * @param tempPath path in the cluster FS for temp files
- * @param libdir relative directory to place resources
- * @param resources list of resource names (e.g. "hbase.jar"
- * @param classes list of classes where classes[i] refers to a class in
- * resources[i]
- * @throws IOException IO problems
- * @throws SliderException any Slider problem
- */
- public static void addDependencyJars(Map<String, LocalResource> providerResources,
- SliderFileSystem sliderFileSystem,
- Path tempPath,
- String libdir,
- String[] resources,
- Class[] classes
- ) throws
- IOException,
- SliderException {
- if (resources.length != classes.length) {
- throw new SliderInternalStateException(
- "mismatch in Jar names [%d] and classes [%d]",
- resources.length,
- classes.length);
- }
- int size = resources.length;
- for (int i = 0; i < size; i++) {
- String jarName = resources[i];
- Class clazz = classes[i];
- SliderUtils.putJar(providerResources,
- sliderFileSystem,
- clazz,
- tempPath,
- libdir,
- jarName);
- }
-
- }
-
- /**
- * Loads all dependency jars from the default path
+ * Loads all dependency jars from the default path.
* @param providerResources map of provider resources to add these entries to
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libDir relative directory to place resources
* @param libLocalSrcDir explicitly supplied local libs dir
- * @throws IOException
- * @throws SliderException
- */
- public static void addAllDependencyJars(Map<String, LocalResource> providerResources,
- SliderFileSystem sliderFileSystem,
- Path tempPath,
- String libDir,
- String libLocalSrcDir)
+ * @throws IOException trouble copying to HDFS
+ * @throws SliderException trouble copying to HDFS
+ */
+ public static void addAllDependencyJars(
+ Map<String, LocalResource> providerResources,
+ SliderFileSystem sliderFileSystem,
+ Path tempPath,
+ String libDir,
+ String libLocalSrcDir)
throws IOException, SliderException {
- String libSrcToUse = libLocalSrcDir;
if (SliderUtils.isSet(libLocalSrcDir)) {
File file = new File(libLocalSrcDir);
if (!file.exists() || !file.isDirectory()) {
- throw new BadCommandArgumentsException("Supplied lib src dir %s is not valid", libLocalSrcDir);
+ throw new BadCommandArgumentsException(
+ "Supplied lib src dir %s is not valid", libLocalSrcDir);
}
}
- SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, libDir, libSrcToUse);
+ SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath,
+ libDir, libLocalSrcDir);
}
+
/**
- * build the log directory
- * @return the log dir
+ * Validate the requested number of instances of a component.
+ * <p>
+ * If max <= 0: min <= count
+ * If max > 0: min <= count <= max
+ * @param instanceDescription configuration
+ * @param name node class name
+ * @param min requested heap size
+ * @param max maximum value.
+ * @throws BadCommandArgumentsException if the values are out of range
*/
- public String getLogdir() throws IOException {
- String logdir = System.getenv("LOGDIR");
- if (logdir == null) {
- logdir =
- SliderKeys.TMP_LOGDIR_PREFIX + UserGroupInformation.getCurrentUser().getShortUserName();
- }
- return logdir;
- }
-
-
public void validateNodeCount(AggregateConf instanceDescription,
- String name, int min, int max) throws
- BadCommandArgumentsException {
+ String name, int min, int max)
+ throws BadCommandArgumentsException {
MapOperations component =
- instanceDescription.getResourceOperations().getComponent(name);
+ instanceDescription.getResourceOperations().getComponent(name);
int count;
if (component == null) {
count = 0;
@@ -229,7 +224,7 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Validate the node count and heap size values of a node class
+ * Validate the count is between min and max.
* <p>
* If max <= 0: min <= count
* If max > 0: min <= count <= max
@@ -256,33 +251,36 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * copy all options beginning site. into the site.xml
- * @param clusterSpec cluster specification
- * @param sitexml map for XML file to build up
+ * Copy options beginning with "site.configName." prefix from options map
+ * to sitexml map, removing the prefix and substituting the tokens
+ * specified in the tokenMap.
+ * @param options source map
+ * @param sitexml destination map
+ * @param configName optional ".configName" portion of the prefix
+ * @param tokenMap key/value pairs to substitute into the option values
*/
- public void propagateSiteOptions(ClusterDescription clusterSpec,
- Map<String, String> sitexml) {
- Map<String, String> options = clusterSpec.options;
- propagateSiteOptions(options, sitexml);
- }
-
- public void propagateSiteOptions(Map<String, String> options,
- Map<String, String> sitexml) {
- propagateSiteOptions(options, sitexml, "");
- }
-
public void propagateSiteOptions(Map<String, String> options,
- Map<String, String> sitexml,
- String configName) {
- propagateSiteOptions(options, sitexml, configName, null);
+ Map<String, String> sitexml,
+ String configName,
+ Map<String,String> tokenMap) {
+ String prefix = OptionKeys.SITE_XML_PREFIX +
+ (!configName.isEmpty() ? configName + "." : "");
+ propagateOptions(options, sitexml, tokenMap, prefix);
}
- public void propagateSiteOptions(Map<String, String> options,
+ /**
+ * Copy options beginning with prefix from options map
+ * to sitexml map, removing the prefix and substituting the tokens
+ * specified in the tokenMap.
+ * @param options source map
+ * @param sitexml destination map
+ * @param tokenMap key/value pairs to substitute into the option values
+ * @param prefix which options to copy to destination map
+ */
+ public void propagateOptions(Map<String, String> options,
Map<String, String> sitexml,
- String configName,
- Map<String,String> tokenMap) {
- String prefix = OptionKeys.SITE_XML_PREFIX +
- (!configName.isEmpty() ? configName + "." : "");
+ Map<String,String> tokenMap,
+ String prefix) {
for (Map.Entry<String, String> entry : options.entrySet()) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
@@ -302,229 +300,1038 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Propagate an option from the cluster specification option map
- * to the site XML map, using the site key for the name
- * @param global global config spec
- * @param optionKey key in the option map
- * @param sitexml map for XML file to build up
- * @param siteKey key to assign the value to in the site XML
- * @throws BadConfigException if the option is missing from the cluster spec
- */
- public void propagateOption(MapOperations global,
- String optionKey,
- Map<String, String> sitexml,
- String siteKey) throws BadConfigException {
- sitexml.put(siteKey, global.getMandatoryOption(optionKey));
- }
-
-
- /**
- * Build the image dir. This path is relative and only valid at the far end
- * @param instanceDefinition instance definition
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToHomeDir(AggregateConf instanceDefinition,
- String bindir,
- String script) throws
- FileNotFoundException,
- BadConfigException {
- MapOperations globalOptions =
- instanceDefinition.getInternalOperations().getGlobalOptions();
- String applicationHome =
- globalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME);
- String imagePath =
- globalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
- return buildPathToHomeDir(imagePath, applicationHome, bindir, script);
- }
-
- public String buildPathToHomeDir(String imagePath,
- String applicationHome,
- String bindir, String script) throws
- FileNotFoundException {
- String path;
- File scriptFile;
- if (imagePath != null) {
- File tarball = new File(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
- scriptFile = findBinScriptInExpandedArchive(tarball, bindir, script);
- // now work back from the script to build the relative path
- // to the binary which will be valid remote or local
- StringBuilder builder = new StringBuilder();
- builder.append(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
- builder.append("/");
- //for the script, we want the name of ../..
- File archive = scriptFile.getParentFile().getParentFile();
- builder.append(archive.getName());
- path = builder.toString();
+ * Substitute tokens into option map values, returning a new map.
+ * @param options source map
+ * @param tokenMap key/value pairs to substitute into the option values
+ * @return map with substituted values
+ */
+ public Map<String, String> filterSiteOptions(Map<String, String> options,
+ Map<String, String> tokenMap) {
+ String prefix = OptionKeys.SITE_XML_PREFIX;
+ Map<String, String> filteredOptions = new HashMap<>();
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ String value = entry.getValue();
+ if (tokenMap != null) {
+ for (Map.Entry<String,String> token : tokenMap.entrySet()) {
+ value = value.replaceAll(Pattern.quote(token.getKey()),
+ token.getValue());
+ }
+ }
+ filteredOptions.put(key, value);
+ }
+ }
+ return filteredOptions;
+ }
+
+ /**
+ * Get resource requirements from a String value. If value isn't specified,
+ * use the default value. If value is greater than max, use the max value.
+ * @param val string value
+ * @param defVal default value
+ * @param maxVal maximum value
+ * @return int resource requirement
+ */
+ public int getRoleResourceRequirement(String val,
+ int defVal,
+ int maxVal) {
+ if (val==null) {
+ val = Integer.toString(defVal);
+ }
+ Integer intVal;
+ if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) {
+ intVal = maxVal;
+ } else {
+ intVal = Integer.decode(val);
+ }
+ return intVal;
+ }
+
+ /**
+ * Localize the service keytabs for the application.
+ * @param launcher container launcher
+ * @param instanceDefinition app specification
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException trouble uploading to HDFS
+ */
+ public void localizeServiceKeytabs(ContainerLauncher launcher,
+ AggregateConf instanceDefinition, SliderFileSystem fileSystem,
+ String clusterName) throws IOException {
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ String keytabPathOnHost = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+ if (SliderUtils.isUnset(keytabPathOnHost)) {
+ String amKeytabName = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+ String keytabDir = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
+ // we need to localize the keytab files in the directory
+ Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
+ clusterName);
+ boolean serviceKeytabsDeployed = false;
+ if (fileSystem.getFileSystem().exists(keytabDirPath)) {
+ FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(
+ keytabDirPath);
+ LocalResource keytabRes;
+ for (FileStatus keytab : keytabs) {
+ if (!amKeytabName.equals(keytab.getPath().getName())
+ && keytab.getPath().getName().endsWith(".keytab")) {
+ serviceKeytabsDeployed = true;
+ log.info("Localizing keytab {}", keytab.getPath().getName());
+ keytabRes = fileSystem.createAmResource(keytab.getPath(),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(KEYTAB_DIR + "/" +
+ keytab.getPath().getName(),
+ keytabRes);
+ }
+ }
+ }
+ if (!serviceKeytabsDeployed) {
+ log.warn("No service keytabs for the application have been localized. "
+ + "If the application requires keytabs for secure operation, "
+ + "please ensure that the required keytabs have been uploaded "
+ + "to the folder {}", keytabDirPath);
+ }
+ }
+ }
+
+ /**
+ * Return whether two-way SSL is enabled for Agent / AM communication.
+ * @param amComponent component specification
+ * @return true if enabled
+ */
+ public boolean hasTwoWaySSLEnabled(MapOperations amComponent) {
+ return amComponent != null ?
+ amComponent.getOptionBool(TWO_WAY_SSL_ENABLED, false) : false;
+ }
+
+ /**
+ * Generate and localize SSL certs for Agent / AM communication
+ * @param launcher container launcher
+ * @param container allocated container information
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws SliderException certs cannot be generated/uploaded
+ */
+ public void localizeContainerSSLResources(ContainerLauncher launcher,
+ Container container, SliderFileSystem fileSystem, String clusterName)
+ throws SliderException {
+ try {
+ // localize server cert
+ Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName);
+ LocalResource certResource = fileSystem.createAmResource(
+ new Path(certsDir, CRT_FILE_NAME),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(CERT_FILE_LOCALIZATION_PATH, certResource);
+
+ // generate and localize agent cert
+ CertificateManager certMgr = new CertificateManager();
+ String hostname = container.getNodeId().getHost();
+ String containerId = container.getId().toString();
+ certMgr.generateContainerCertificate(hostname, containerId);
+ LocalResource agentCertResource = fileSystem.createAmResource(
+ uploadSecurityResource(
+ CertificateManager.getAgentCertficateFilePath(containerId),
+ fileSystem, clusterName), LocalResourceType.FILE);
+ // still using hostname as file name on the agent side, but the files
+ // do end up under the specific container's file space
+ launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname +
+ ".crt", agentCertResource);
+ LocalResource agentKeyResource = fileSystem.createAmResource(
+ uploadSecurityResource(
+ CertificateManager.getAgentKeyFilePath(containerId), fileSystem,
+ clusterName),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname +
+ ".key", agentKeyResource);
+
+ } catch (Exception e) {
+ throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e,
+ "Unable to localize certificates. Two-way SSL cannot be enabled");
+ }
+ }
+
+ /**
+ * Upload a local file to the cluster security dir in HDFS. If the file
+ * already exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private Path uploadSecurityResource(File resource,
+ SliderFileSystem fileSystem, String clusterName) throws IOException {
+ Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName);
+ return uploadResource(resource, fileSystem, certsDir);
+ }
+
+ /**
+ * Upload a local file to the cluster resources dir in HDFS. If the file
+ * already exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param roleName optional subdirectory (for component-specific resources)
+ * @param clusterName app name
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private Path uploadResource(File resource, SliderFileSystem fileSystem,
+ String roleName, String clusterName) throws IOException {
+ Path dir;
+ if (roleName == null) {
+ dir = fileSystem.buildClusterResourcePath(clusterName);
+ } else {
+ dir = fileSystem.buildClusterResourcePath(clusterName, roleName);
+ }
+ return uploadResource(resource, fileSystem, dir);
+ }
+ /**
+ * Upload a local file to a specified HDFS directory. If the file already
+ * exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param parentDir destination directory in HDFS
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private synchronized Path uploadResource(File resource,
+ SliderFileSystem fileSystem, Path parentDir) throws IOException {
+ if (!fileSystem.getFileSystem().exists(parentDir)) {
+ fileSystem.getFileSystem().mkdirs(parentDir,
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ }
+ Path destPath = new Path(parentDir, resource.getName());
+ if (!fileSystem.getFileSystem().exists(destPath)) {
+ FSDataOutputStream os = null;
+ try {
+ os = fileSystem.getFileSystem().create(destPath);
+ byte[] contents = FileUtils.readFileToByteArray(resource);
+ os.write(contents, 0, contents.length);
+ os.flush();
+ } finally {
+ IOUtils.closeStream(os);
+ }
+ log.info("Uploaded {} to localization path {}", resource, destPath);
} else {
- // using a home directory which is required to be present on
- // the local system -so will be absolute and resolvable
- File homedir = new File(applicationHome);
- path = homedir.getAbsolutePath();
+ log.info("Resource {} already existed at localization path {}", resource,
+ destPath);
+ }
- //this is absolute, resolve its entire path
- SliderUtils.verifyIsDir(homedir, log);
- File bin = new File(homedir, bindir);
- SliderUtils.verifyIsDir(bin, log);
- scriptFile = new File(bin, script);
- SliderUtils.verifyFileExists(scriptFile, log);
+ while (!fileSystem.getFileSystem().exists(destPath)) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- return path;
+
+ fileSystem.getFileSystem().setPermission(destPath,
+ new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
+
+ return destPath;
}
-
/**
- * Build the image dir. This path is relative and only valid at the far end
- * @param instance instance options
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToScript(AggregateConf instance,
- String bindir,
- String script) throws FileNotFoundException {
- return buildPathToScript(instance.getInternalOperations(), bindir, script);
- }
- /**
- * Build the image dir. This path is relative and only valid at the far end
- * @param internal internal options
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToScript(ConfTreeOperations internal,
- String bindir,
- String script) throws FileNotFoundException {
-
- String homedir = buildPathToHomeDir(
- internal.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH),
- internal.get(InternalKeys.INTERNAL_APPLICATION_HOME),
- bindir,
- script);
- return buildScriptPath(bindir, script, homedir);
+ * Write a configuration property map to a local file in a specified format.
+ * @param fileSystem file system
+ * @param file destination file
+ * @param configFormat file format
+ * @param configFileDN file description
+ * @param config properties to save to the file
+ * @param clusterName app name
+ * @throws IOException file cannot be created
+ */
+ private void createConfigFile(SliderFileSystem fileSystem, File file,
+ ConfigFormat configFormat, String configFileDN,
+ Map<String, String> config, String clusterName) throws IOException {
+ log.info("Writing {} file {}", configFormat, file);
+
+ ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
+ fileSystem, clusterName, file.getName());
+ PublishedConfiguration publishedConfiguration =
+ new PublishedConfiguration(configFileDN,
+ config.entrySet());
+ PublishedConfigurationOutputter configurationOutputter =
+ PublishedConfigurationOutputter.createOutputter(configFormat,
+ publishedConfiguration);
+ configurationOutputter.save(file);
}
-
-
- public String buildScriptPath(String bindir, String script, String homedir) {
- StringBuilder builder = new StringBuilder(homedir);
- builder.append("/");
- builder.append(bindir);
- builder.append("/");
- builder.append(script);
- return builder.toString();
+ /**
+ * Determine config files requested in the appConf, generate the files, and
+ * localize them.
+ * @param launcher container launcher
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @param configs configurations grouped by config name
+ * @param env environment variables
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException file(s) cannot be uploaded
+ * @throws BadConfigException file name not specified or file format not
+ * supported
+ */
+ public void localizeConfigFiles(ContainerLauncher launcher,
+ String roleName, String roleGroup,
+ ConfTreeOperations appConf,
+ Map<String, Map<String, String>> configs,
+ MapOperations env,
+ SliderFileSystem fileSystem,
+ String clusterName)
+ throws IOException, BadConfigException {
+ for (Entry<String, Map<String, String>> configEntry : configs.entrySet()) {
+ String configFileName = appConf.getComponentOpt(roleGroup,
+ OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys
+ .NAME_SUFFIX, null);
+ String configFileType = appConf.getComponentOpt(roleGroup,
+ OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys
+ .TYPE_SUFFIX, null);
+ if (configFileName == null && configFileType == null) {
+ // config file not requested, so continue
+ continue;
+ }
+ if (configFileName == null) {
+ throw new BadConfigException("Config file name null for " +
+ configEntry.getKey());
+ }
+ if (configFileType == null) {
+ throw new BadConfigException("Config file type null for " +
+ configEntry.getKey());
+ }
+ ConfigFormat configFormat = ConfigFormat.resolve(configFileType);
+ if (configFormat == null) {
+ throw new BadConfigException("Config format " + configFormat +
+ " doesn't exist");
+ }
+ localizeConfigFile(launcher, roleName, roleGroup, configEntry.getKey(),
+ configFormat, configFileName, configs, env, fileSystem, clusterName);
+ }
}
+ /**
+ * Create and localize a config file.
+ * @param launcher container launcher
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param configFileDN config description/name
+ * @param configFormat config format
+ * @param configFileName config file name
+ * @param configs configs grouped by config description/name
+ * @param env environment variables
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException file cannot be uploaded
+ */
+ public void localizeConfigFile(ContainerLauncher launcher,
+ String roleName, String roleGroup,
+ String configFileDN, ConfigFormat configFormat, String configFileName,
+ Map<String, Map<String, String>> configs,
+ MapOperations env,
+ SliderFileSystem fileSystem,
+ String clusterName)
+ throws IOException {
+ if (launcher == null) {
+ return;
+ }
+ Map<String, String> config = ConfigUtils.replacePropsInConfig(
+ configs.get(configFileDN), env.options);
+ String fileName = ConfigUtils.replaceProps(config, configFileName);
+ File localFile = new File(RESOURCE_DIR);
+ if (!localFile.exists()) {
+ if (!localFile.mkdir()) {
+ throw new IOException(RESOURCE_DIR + " could not be created!");
+ }
+ }
+ localFile = new File(localFile, new File(fileName).getName());
- public static String convertToAppRelativePath(File file) {
- return convertToAppRelativePath(file.getPath());
+ String folder = null;
+ if ("true".equals(config.get(PER_COMPONENT))) {
+ folder = roleName;
+ } else if ("true".equals(config.get(PER_GROUP))) {
+ folder = roleGroup;
+ }
+
+ log.info("Localizing {} configs to config file {} (destination {}) " +
+ "based on {} configs", config.size(), localFile, fileName,
+ configFileDN);
+ createConfigFile(fileSystem, localFile, configFormat, configFileDN, config,
+ clusterName);
+ Path destPath = uploadResource(localFile, fileSystem, folder, clusterName);
+ LocalResource configResource = fileSystem.createAmResource(destPath,
+ LocalResourceType.FILE);
+
+ File destFile = new File(fileName);
+ if (destFile.isAbsolute()) {
+ launcher.addLocalResource(
+ RESOURCE_DIR + "/" + destFile.getName(),
+ configResource, fileName);
+ } else {
+ launcher.addLocalResource(APP_CONF_DIR + "/" + fileName,
+ configResource);
+ }
}
- public static String convertToAppRelativePath(String path) {
- return ApplicationConstants.Environment.PWD.$() + "/" + path;
+ /**
+ * Generate and localize security stores requested by the app. Also perform
+ * last-minute substitution of cluster name into credentials strings.
+ * @param launcher container launcher
+ * @param container allocated container information
+ * @param role component name
+ * @param fileSystem file system
+ * @param instanceDefinition app specification
+ * @param compOps component specification
+ * @param clusterName app name
+ * @throws SliderException stores cannot be generated/uploaded
+ * @throws IOException stores cannot be generated/uploaded
+ */
+ public void localizeContainerSecurityStores(ContainerLauncher launcher,
+ Container container,
+ String role,
+ SliderFileSystem fileSystem,
+ AggregateConf instanceDefinition,
+ MapOperations compOps,
+ String clusterName)
+ throws SliderException, IOException {
+ // substitute CLUSTER_NAME into credentials
+ Map<String,List<String>> newcred = new HashMap<>();
+ for (Entry<String,List<String>> entry :
+ instanceDefinition.getAppConf().credentials.entrySet()) {
+ List<String> resultList = new ArrayList<>();
+ for (String v : entry.getValue()) {
+ resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"),
+ clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
+ clusterName));
+ }
+ newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"),
+ clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
+ clusterName),
+ resultList);
+ }
+ instanceDefinition.getAppConf().credentials = newcred;
+
+ // generate and localize security stores
+ SecurityStore[] stores = generateSecurityStores(container, role,
+ instanceDefinition, compOps);
+ for (SecurityStore store : stores) {
+ LocalResource keystoreResource = fileSystem.createAmResource(
+ uploadSecurityResource(store.getFile(), fileSystem, clusterName),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(String.format("secstores/%s-%s.p12",
+ store.getType(), role),
+ keystoreResource);
+ }
+ }
+
+ /**
+ * Generate security stores requested by the app.
+ * @param container allocated container information
+ * @param role component name
+ * @param instanceDefinition app specification
+ * @param compOps component specification
+ * @return security stores
+ * @throws SliderException stores cannot be generated
+ * @throws IOException stores cannot be generated
+ */
+ private SecurityStore[] generateSecurityStores(Container container,
+ String role,
+ AggregateConf instanceDefinition,
+ MapOperations compOps)
+ throws SliderException, IOException {
+ return StoresGenerator.generateSecurityStores(
+ container.getNodeId().getHost(), container.getId().toString(),
+ role, instanceDefinition, compOps);
}
+ /**
+ * Return whether security stores are requested by the app.
+ * @param compOps component specification
+ * @return true if stores are requested
+ */
+ public boolean areStoresRequested(MapOperations compOps) {
+ return compOps != null ? compOps.
+ getOptionBool(COMP_STORES_REQUIRED_KEY, false) : false;
+ }
- public static void validatePathReferencesLocalDir(String meaning, String path)
- throws BadConfigException {
- File file = new File(path);
- if (!file.exists()) {
- throw new BadConfigException("%s directory %s not found", meaning, file);
+ /**
+ * Localize application tarballs and other resources requested by the app.
+ * @param launcher container launcher
+ * @param fileSystem file system
+ * @param appConf app configurations
+ * @param roleGroup component group
+ * @param clusterName app name
+ * @throws IOException resources cannot be uploaded
+ * @throws BadConfigException package name or type is not specified
+ */
+ public void localizePackages(ContainerLauncher launcher,
+ SliderFileSystem fileSystem, ConfTreeOperations appConf, String roleGroup,
+ String clusterName) throws IOException, BadConfigException {
+ for (Entry<String, Map<String, String>> pkg :
+ getPackages(roleGroup, appConf).entrySet()) {
+ String pkgName = pkg.getValue().get(OptionKeys.NAME_SUFFIX);
+ String pkgType = pkg.getValue().get(OptionKeys.TYPE_SUFFIX);
+ Path pkgPath = fileSystem.buildResourcePath(pkgName);
+ if (!fileSystem.isFile(pkgPath)) {
+ pkgPath = fileSystem.buildResourcePath(clusterName,
+ pkgName);
+ }
+ if (!fileSystem.isFile(pkgPath)) {
+ throw new IOException("Package doesn't exist as a resource: " +
+ pkgName);
+ }
+ log.info("Adding resource {}", pkgName);
+ LocalResourceType type = LocalResourceType.FILE;
+ if ("archive".equals(pkgType)) {
+ type = LocalResourceType.ARCHIVE;
+ }
+ LocalResource packageResource = fileSystem.createAmResource(
+ pkgPath, type);
+ launcher.addLocalResource(APP_PACKAGES_DIR, packageResource);
}
- if (!file.isDirectory()) {
- throw new BadConfigException("%s is not a directory: %s", meaning, file);
+ }
+
+ /**
+ * Build a map of configuration description/name to configuration key/value
+ * properties, with all known tokens substituted into the property values.
+ * @param appConf app configurations
+ * @param internalsConf internal configurations
+ * @param containerId container ID
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param amState access to AM state
+ * @return configuration properties grouped by config description/name
+ */
+ public Map<String, Map<String, String>> buildConfigurations(
+ ConfTreeOperations appConf, ConfTreeOperations internalsConf,
+ String containerId, String roleName, String roleGroup,
+ StateAccessForProviders amState) {
+
+ Map<String, Map<String, String>> configurations = new TreeMap<>();
+ Map<String, String> tokens = getStandardTokenMap(appConf,
+ internalsConf, roleName, roleGroup, containerId);
+
+ Set<String> configs = new HashSet<>();
+ configs.addAll(getApplicationConfigurationTypes(roleGroup, appConf));
+ configs.addAll(getSystemConfigurationsRequested(appConf));
+
+ for (String configType : configs) {
+ addNamedConfiguration(configType, appConf.getGlobalOptions().options,
+ configurations, tokens, amState);
+ if (appConf.getComponent(roleGroup) != null) {
+ addNamedConfiguration(configType,
+ appConf.getComponent(roleGroup).options, configurations, tokens,
+ amState);
+ }
}
+
+ //do a final replacement of re-used configs
+ dereferenceAllConfigs(configurations);
+
+ return configurations;
}
/**
- * get the user name
- * @return the user name
+ * Substitute "site." prefixed configuration values into other configuration
+ * values where needed. The format for these substitutions is that
+ * {@literal ${@//site/configDN/key}} will be replaced by the value for the
+ * "site.configDN.key" property.
+ * @param configurations configuration properties grouped by config
+ * description/name
*/
- public String getUserName() throws IOException {
- return UserGroupInformation.getCurrentUser().getShortUserName();
+ public void dereferenceAllConfigs(
+ Map<String, Map<String, String>> configurations) {
+ Map<String, String> allConfigs = new HashMap<>();
+ String lookupFormat = "${@//site/%s/%s}";
+ for (String configType : configurations.keySet()) {
+ Map<String, String> configBucket = configurations.get(configType);
+ for (String configName : configBucket.keySet()) {
+ allConfigs.put(String.format(lookupFormat, configType, configName),
+ configBucket.get(configName));
+ }
+ }
+
+ boolean finished = false;
+ while (!finished) {
+ finished = true;
+ for (Map.Entry<String, String> entry : allConfigs.entrySet()) {
+ String configValue = entry.getValue();
+ for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
+ String lookUpValue = lookUpEntry.getValue();
+ if (lookUpValue.contains("${@//site/")) {
+ continue;
+ }
+ String lookUpKey = lookUpEntry.getKey();
+ if (configValue != null && configValue.contains(lookUpKey)) {
+ configValue = configValue.replace(lookUpKey, lookUpValue);
+ }
+ }
+ if (!configValue.equals(entry.getValue())) {
+ finished = false;
+ allConfigs.put(entry.getKey(), configValue);
+ }
+ }
+ }
+
+ for (String configType : configurations.keySet()) {
+ Map<String, String> configBucket = configurations.get(configType);
+ for (Map.Entry<String, String> entry: configBucket.entrySet()) {
+ String configName = entry.getKey();
+ String configValue = entry.getValue();
+ for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
+ String lookUpValue = lookUpEntry.getValue();
+ if (lookUpValue.contains("${@//site/")) {
+ continue;
+ }
+ String lookUpKey = lookUpEntry.getKey();
+ if (configValue != null && configValue.contains(lookUpKey)) {
+ configValue = configValue.replace(lookUpKey, lookUpValue);
+ }
+ }
+ configBucket.put(configName, configValue);
+ }
+ }
}
/**
- * Find a script in an expanded archive
- * @param base base directory
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory
+ * Return a set of configuration description/names represented in the app.
+ * configuration
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @return set of configuration description/names
*/
- public File findBinScriptInExpandedArchive(File base,
- String bindir,
- String script)
- throws FileNotFoundException {
-
- SliderUtils.verifyIsDir(base, log);
- File[] ls = base.listFiles();
- if (ls == null) {
- //here for the IDE to be happy, as the previous check will pick this case
- throw new FileNotFoundException("Failed to list directory " + base);
+ public Set<String> getApplicationConfigurationTypes(String roleGroup,
+ ConfTreeOperations appConf) {
+ Set<String> configList = new HashSet<>();
+
+ String prefix = OptionKeys.CONF_FILE_PREFIX;
+ String suffix = OptionKeys.TYPE_SUFFIX;
+ MapOperations component = appConf.getComponent(roleGroup);
+ if (component != null) {
+ addConfsToList(component, configList, prefix, suffix);
}
+ addConfsToList(appConf.getGlobalOptions(), configList, prefix, suffix);
- log.debug("Found {} entries in {}", ls.length, base);
- List<File> directories = new LinkedList<File>();
- StringBuilder dirs = new StringBuilder();
- for (File file : ls) {
- log.debug("{}", false);
- if (file.isDirectory()) {
- directories.add(file);
- dirs.append(file.getPath()).append(" ");
+ return configList;
+ }
+
+ /**
+ * Finds all configuration description/names of the form
+ * prefixconfigDNsuffix in the configuration (e.g. conf.configDN.type).
+ * @param confMap configuration properties
+ * @param confList set containing configuration description/names
+ * @param prefix configuration key prefix to match
+ * @param suffix configuration key suffix to match
+ */
+ private void addConfsToList(Map<String, String> confMap,
+ Set<String> confList, String prefix, String suffix) {
+ for (String key : confMap.keySet()) {
+ if (key.startsWith(prefix) && key.endsWith(suffix)) {
+ String confName = key.substring(prefix.length(),
+ key.length() - suffix.length());
+ if (!confName.isEmpty()) {
+ confList.add(confName);
+ }
}
}
- if (directories.size() > 1) {
- throw new FileNotFoundException(
- "Too many directories in archive to identify binary: " + dirs);
+ }
+
+ /**
+ * Build a map of package description/name to package key/value properties
+ * (there should be two properties, type and name).
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @return map of package description/name to package key/value properties
+ * @throws BadConfigException package name or type is not specified
+ */
+ public Map<String, Map<String, String>> getPackages(String roleGroup,
+ ConfTreeOperations appConf) throws BadConfigException {
+ Map<String, Map<String, String>> packages = new HashMap<>();
+ String prefix = OptionKeys.PKG_FILE_PREFIX;
+ String typeSuffix = OptionKeys.TYPE_SUFFIX;
+ String nameSuffix = OptionKeys.NAME_SUFFIX;
+ MapOperations component = appConf.getComponent(roleGroup);
+ if (component == null) {
+ component = appConf.getGlobalOptions();
+ }
+ for (Map.Entry<String, String> entry : component.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ String confName;
+ String type;
+ if (key.endsWith(typeSuffix)) {
+ confName = key.substring(prefix.length(), key.length() - typeSuffix.length());
+ type = typeSuffix;
+ } else if (key.endsWith(nameSuffix)) {
+ confName = key.substring(prefix.length(), key.length() - nameSuffix.length());
+ type = nameSuffix;
+ } else {
+ continue;
+ }
+ if (!packages.containsKey(confName)) {
+ packages.put(confName, new HashMap<String, String>());
+ }
+ packages.get(confName).put(type, entry.getValue());
+ }
}
- if (directories.isEmpty()) {
- throw new FileNotFoundException(
- "No directory found in archive " + base);
+
+ for (Entry<String, Map<String, String>> pkg : packages.entrySet()) {
+ if (!pkg.getValue().containsKey(OptionKeys.TYPE_SUFFIX)) {
+ throw new BadConfigException("Package " + pkg.getKey() + " doesn't " +
+ "have a package type");
+ }
+ if (!pkg.getValue().containsKey(OptionKeys.NAME_SUFFIX)) {
+ throw new BadConfigException("Package " + pkg.getKey() + " doesn't " +
+ "have a package name");
+ }
}
- File archive = directories.get(0);
- File bin = new File(archive, bindir);
- SliderUtils.verifyIsDir(bin, log);
- File scriptFile = new File(bin, script);
- SliderUtils.verifyFileExists(scriptFile, log);
- return scriptFile;
+
+ return packages;
}
/**
- * Return any additional arguments (argv) to provide when starting this role
- *
- * @param roleOptions
- * The options for this role
- * @return A non-null String which contains command line arguments for this role, or the empty string.
+ * Return system configurations requested by the app.
+ * @param appConf app configurations
+ * @return set of system configurations
*/
- public static String getAdditionalArgs(Map<String,String> roleOptions) {
- if (roleOptions.containsKey(RoleKeys.ROLE_ADDITIONAL_ARGS)) {
- String additionalArgs = roleOptions.get(RoleKeys.ROLE_ADDITIONAL_ARGS);
- if (null != additionalArgs) {
- return additionalArgs;
+ public Set<String> getSystemConfigurationsRequested(
+ ConfTreeOperations appConf) {
+ Set<String> configList = new HashSet<>();
+
+ String configTypes = appConf.get(SYSTEM_CONFIGS);
+ if (configTypes != null && configTypes.length() > 0) {
+ String[] configs = configTypes.split(",");
+ for (String config : configs) {
+ configList.add(config.trim());
}
}
- return "";
+ return configList;
}
-
- public int getRoleResourceRequirement(String val,
- int defVal,
- int maxVal) {
- if (val==null) {
- val = Integer.toString(defVal);
+
+ /**
+ * For a given config description/name, pull out its site configs from the
+ * source config map, remove the site.configDN. prefix from them, and place
+ * them into a new config map using the {@link #propagateSiteOptions} method
+ * (with tokens substituted). This new k/v map is put as the value for the
+ * configDN key in the configurations map.
+ * @param configName config description/name
+ * @param sourceConfig config containing site.* properties
+ * @param configurations configuration map to be populated
+ * @param tokens initial substitution tokens
+ * @param amState access to AM state
+ */
+ private void addNamedConfiguration(String configName,
+ Map<String, String> sourceConfig,
+ Map<String, Map<String, String>> configurations,
+ Map<String, String> tokens, StateAccessForProviders amState) {
+ Map<String, String> config = new HashMap<>();
+ if (configName.equals(GLOBAL_CONFIG_TAG)) {
+ addDefaultGlobalConfig(config);
}
- Integer intVal;
- if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) {
- intVal = maxVal;
+ // add role hosts to tokens
+ addRoleRelatedTokens(tokens, amState);
+ propagateSiteOptions(sourceConfig, config, configName, tokens);
+
+ configurations.put(configName, config);
+ }
+
+ /**
+ * Get initial token map to be substituted into config values.
+ * @param appConf app configurations
+ * @param internals internal configurations
+ * @param componentName component name
+ * @param componentGroup component group
+ * @param clusterName app name
+ * @return tokens to replace
+ */
+ public Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
+ ConfTreeOperations internals, String componentName,
+ String componentGroup, String clusterName) {
+ return getStandardTokenMap(appConf, internals, componentName,
+ componentGroup, null, clusterName);
+ }
+
+ /**
+ * Get initial token map to be substituted into config values.
+ * @param appConf app configurations
+ * @param internals internal configurations
+ * @param componentName component name
+ * @param componentGroup component group
+ * @param containerId container ID
+ * @param clusterName app name
+ * @return tokens to replace
+ */
+ public Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
+ ConfTreeOperations internals, String componentName,
+ String componentGroup, String containerId, String clusterName) {
+
+ Map<String, String> tokens = new HashMap<>();
+ if (containerId != null) {
+ tokens.put("${CONTAINER_ID}", containerId);
+ }
+ String nnuri = appConf.get("site.fs.defaultFS");
+ tokens.put("${NN_URI}", nnuri);
+ tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
+ tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
+ tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
+ String prefix = appConf.getComponentOpt(componentGroup, ROLE_PREFIX,
+ null);
+ String dataDirSuffix = "";
+ if (prefix == null) {
+ prefix = "";
} else {
- intVal = Integer.decode(val);
+ dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix);
}
- return intVal;
+ tokens.put("${DEFAULT_DATA_DIR}", internals.getGlobalOptions()
+ .getOption(InternalKeys.INTERNAL_DATA_DIR_PATH, null) + dataDirSuffix);
+ tokens.put("${JAVA_HOME}", appConf.get(JAVA_HOME));
+ tokens.put("${COMPONENT_NAME}", componentName);
+ tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase());
+ tokens.put("${COMPONENT_PREFIX}", prefix);
+ tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase());
+ if (!componentName.equals(componentGroup) &&
+ componentName.startsWith(componentGroup)) {
+ tokens.put("${COMPONENT_ID}",
+ componentName.substring(componentGroup.length()));
+ }
+ if (clusterName != null) {
+ tokens.put("${CLUSTER_NAME}", clusterName);
+ tokens.put("${CLUSTER_NAME.lc}", clusterName.toLowerCase());
+ tokens.put("${APP_NAME}", clusterName);
+ tokens.put("${APP_NAME.lc}", clusterName.toLowerCase());
+ }
+ tokens.put("${APP_COMPONENT_NAME}", componentName);
+ tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase());
+ return tokens;
+ }
+
+ /**
+ * Add ROLE_HOST tokens for substitution into config values.
+ * @param tokens existing tokens
+ * @param amState access to AM state
+ */
+ public void addRoleRelatedTokens(Map<String, String> tokens,
+ StateAccessForProviders amState) {
+ if (amState == null) {
+ return;
+ }
+ for (Map.Entry<String, Map<String, ClusterNode>> entry :
+ amState.getRoleClusterNodeMapping().entrySet()) {
+ String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
+ String hosts = StringUtils .join(",",
+ getHostsList(entry.getValue().values(), true));
+ tokens.put("${" + tokenName + "}", hosts);
+ }
+ }
+
+ /**
+ * Add global configuration properties.
+ * @param config map where default global properties will be added
+ */
+ private void addDefaultGlobalConfig(Map<String, String> config) {
+ config.put("app_log_dir", "${LOG_DIR}");
+ config.put("app_pid_dir", "${WORK_DIR}/app/run");
+ config.put("app_install_dir", "${WORK_DIR}/app/install");
+ config.put("app_conf_dir", "${WORK_DIR}/" + APP_CONF_DIR);
+ config.put("app_input_conf_dir", "${WORK_DIR}/" + PROPAGATED_CONF_DIR_NAME);
+
+ // add optional parameters only if they are not already provided
+ if (!config.containsKey("pid_file")) {
+ config.put("pid_file", "${WORK_DIR}/app/run/component.pid");
+ }
+ if (!config.containsKey("app_root")) {
+ config.put("app_root", "${WORK_DIR}/app/install");
+ }
+ }
+
+ /**
+ * Return a list of hosts based on current ClusterNodes.
+ * @param values cluster nodes
+ * @param hostOnly whether host or host/server name will be added to list
+ * @return list of hosts
+ */
+ public Iterable<String> getHostsList(Collection<ClusterNode> values,
+ boolean hostOnly) {
+ List<String> hosts = new ArrayList<>();
+ for (ClusterNode cn : values) {
+ hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
+ }
+ return hosts;
+ }
+
+ /**
+ * Update ServiceRecord in Registry with IP and hostname.
+ * @param amState access to AM state
+ * @param yarnRegistry acces to YARN registry
+ * @param containerId container ID
+ * @param roleName component name
+ * @param ip list of IPs
+ * @param hostname hostname
+ */
+ public void updateServiceRecord(StateAccessForProviders amState,
+ YarnRegistryViewForProviders yarnRegistry,
+ String containerId, String roleName, List<String> ip, String hostname) {
+ try {
+ RoleInstance role = null;
+ if(ip != null && !ip.isEmpty()){
+ role = amState.getOwnedContainer(containerId);
+ role.ip = ip.get(0);
+ }
+ if(hostname != null && !hostname.isEmpty()){
+ role = amState.getOwnedContainer(containerId);
+ role.hostname = hostname;
+ }
+ if (role != null) {
+ // create and publish updated service record (including hostname & ip)
+ ServiceRecord record = new ServiceRecord();
+ record.set(YarnRegistryAttributes.YARN_ID, containerId);
+ record.description = roleName.replaceAll("_", "-");
+ record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ PersistencePolicies.CONTAINER);
+ // TODO: use constants from YarnRegistryAttributes
+ if (role.ip != null) {
+ record.set("yarn:ip", role.ip);
+ }
+ if (role.hostname != null) {
+ record.set("yarn:hostname", role.hostname);
+ }
+ yarnRegistry.putComponent(
+ RegistryPathUtils.encodeYarnID(containerId), record);
+ }
+ } catch (NoSuchNodeException e) {
+ // ignore - there is nothing to do if we don't find a container
+ log.warn("Owned container {} not found - {}", containerId, e);
+ } catch (IOException e) {
+ log.warn("Error updating container {} service record in registry",
+ containerId, e);
+ }
+ }
+
+ /**
+ * Publish a named property bag that may contain name-value pairs for app
+ * configurations such as hbase-site.
+ * @param name config file identifying name
+ * @param description config file description
+ * @param entries config file properties
+ * @param amState access to AM state
+ */
+ public void publishApplicationInstanceData(String name, String description,
+ Iterable<Map.Entry<String, String>> entries,
+ StateAccessForProviders amState) {
+ PublishedConfiguration pubconf = new PublishedConfiguration(description,
+ entries);
+ log.info("publishing {}", pubconf);
+ amState.getPublishedSliderConfigurations().put(name, pubconf);
+ }
+
+ /**
+ * Publish an export group.
+ * @param exportGroup export groups
+ * @param amState access to AM state
+ * @param roleGroup component group
+ */
+ public void publishExportGroup(Map<String, List<ExportEntry>> exportGroup,
+ StateAccessForProviders amState, String roleGroup) {
+ // Publish in old format for the time being
+ Map<String, String> simpleEntries = new HashMap<>();
+ for (Entry<String, List<ExportEntry>> entry : exportGroup.entrySet()) {
+ List<ExportEntry> exports = entry.getValue();
+ if (SliderUtils.isNotEmpty(exports)) {
+ // there is no support for multiple exports per name, so extract only
+ // the first one
+ simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
+ }
+ }
+ publishApplicationInstanceData(roleGroup, roleGroup,
+ simpleEntries.entrySet(), amState);
+
+ PublishedExports exports = new PublishedExports(roleGroup);
+ exports.setUpdated(new Date().getTime());
+ exports.putValues(exportGroup.entrySet());
+ amState.getPublishedExportsSet().put(roleGroup, exports);
+ }
+
+ public Map<String, String> getExports(ConfTreeOperations appConf,
+ String roleGroup) {
+ Map<String, String> exports = new HashMap<>();
+ propagateOptions(appConf.getComponent(roleGroup).options, exports,
+ null, OptionKeys.EXPORT_PREFIX);
+ return exports;
+ }
+
+ private static final String COMPONENT_TAG = "component";
+ private static final String HOST_FOLDER_FORMAT = "%s:%s";
+ private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
+ private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
+
+ /**
+ * Format the folder locations and publish in the registry service.
+ * @param folders folder information
+ * @param containerId container ID
+ * @param hostFqdn host FQDN
+ * @param componentName component name
+ */
+ public void publishFolderPaths(Map<String, String> folders,
+ String containerId, String componentName, String hostFqdn,
+ StateAccessForProviders amState,
+ Map<String, ExportEntry> logFolderExports,
+ Map<String, ExportEntry> workFolderExports) {
+ Date now = new Date();
+ for (Map.Entry<String, String> entry : folders.entrySet()) {
+ ExportEntry exportEntry = new ExportEntry();
+ exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn,
+ entry.getValue()));
+ exportEntry.setContainerId(containerId);
+ exportEntry.setLevel(COMPONENT_TAG);
+ exportEntry.setTag(componentName);
+ exportEntry.setUpdatedTime(now.toString());
+ if (entry.getKey().equals("AGENT_LOG_ROOT") ||
+ entry.getKey().equals("LOG_DIR")) {
+ synchronized (logFolderExports) {
+ logFolderExports.put(containerId, exportEntry);
+ }
+ } else {
+ synchronized (workFolderExports) {
+ workFolderExports.put(containerId, exportEntry);
+ }
+ }
+ log.info("Updating log and pwd folders for container {}", containerId);
+ }
+
+ PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (logFolderExports) {
+ updateExportsFromList(exports, logFolderExports);
+ }
+ amState.getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
+
+ exports = new PublishedExports(CONTAINER_PWDS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (workFolderExports) {
+ updateExportsFromList(exports, workFolderExports);
+ }
+ amState.getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
+ }
+
+ /**
+ * Update the export data from the map.
+ * @param exports published exports
+ * @param folderExports folder exports
+ */
+ private void updateExportsFromList(PublishedExports exports,
+ Map<String, ExportEntry> folderExports) {
+ Map<String, List<ExportEntry>> perComponentList = new HashMap<>();
+ for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet()) {
+ String componentName = logEntry.getValue().getTag();
+ if (!perComponentList.containsKey(componentName)) {
+ perComponentList.put(componentName, new ArrayList<ExportEntry>());
+ }
+ perComponentList.get(componentName).add(logEntry.getValue());
+ }
+ exports.putValues(perComponentList.entrySet());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org