You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/09/01 14:43:13 UTC
[1/3] hadoop git commit: YARN-5505. Create an agent-less docker
provider in the native-services framework. Contributed by Billie Rinaldi
Repository: hadoop
Updated Branches:
refs/heads/yarn-native-services d0eb0e7d0 -> c999964a0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderService.java
new file mode 100644
index 0000000..bebb5f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderService.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.docker;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.OptionKeys;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.launch.CommandLineBuilder;
+import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.ConfigUtils;
+import org.apache.slider.core.registry.docstore.ExportEntry;
+import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCore;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.providers.ProviderUtils;
+import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Scanner;
+
+public class DockerProviderService extends AbstractProviderService implements
+ ProviderCore,
+ DockerKeys,
+ SliderKeys {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(DockerProviderService.class);
+ private static final ProviderUtils providerUtils = new ProviderUtils(log);
+ private static final String EXPORT_GROUP = "quicklinks";
+ private static final String APPLICATION_TAG = "application";
+
+ private String clusterName = null;
+ private SliderFileSystem fileSystem = null;
+
+ protected DockerProviderService() {
+ super("DockerProviderService");
+ }
+
+ @Override
+ public List<ProviderRole> getRoles() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isSupportedRole(String role) {
+ return true;
+ }
+
+ @Override
+ public void validateInstanceDefinition(AggregateConf instanceDefinition)
+ throws SliderException {
+ }
+
+ private String getClusterName() {
+ if (SliderUtils.isUnset(clusterName)) {
+ clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME);
+ }
+ return clusterName;
+ }
+
+ @Override
+ public void buildContainerLaunchContext(ContainerLauncher launcher,
+ AggregateConf instanceDefinition, Container container,
+ ProviderRole providerRole, SliderFileSystem fileSystem,
+ Path generatedConfPath, MapOperations resourceComponent,
+ MapOperations appComponent, Path containerTmpDirPath)
+ throws IOException, SliderException {
+
+ String roleName = providerRole.name;
+ String roleGroup = providerRole.group;
+
+ initializeApplicationConfiguration(instanceDefinition, fileSystem,
+ roleGroup);
+
+ log.info("Build launch context for Docker");
+ log.debug(instanceDefinition.toString());
+
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ launcher.setYarnDockerMode(true);
+ launcher.setDockerImage(appConf.getComponentOpt(roleGroup, DOCKER_IMAGE,
+ null));
+ launcher.setDockerNetwork(appConf.getComponentOpt(roleGroup, DOCKER_NETWORK,
+ DEFAULT_DOCKER_NETWORK));
+ launcher.setRunPrivilegedContainer(appConf.getComponentOptBool(roleGroup,
+ DOCKER_USE_PRIVILEGED, false));
+
+ // Set the environment
+ launcher.putEnv(SliderUtils.buildEnvMap(appComponent,
+ providerUtils.getStandardTokenMap(getAmState().getAppConfSnapshot(),
+ getAmState().getInternalsSnapshot(), roleName, roleGroup,
+ getClusterName())));
+
+ String workDir = ApplicationConstants.Environment.PWD.$();
+ launcher.setEnv("WORK_DIR", workDir);
+ log.info("WORK_DIR set to {}", workDir);
+ String logDir = ApplicationConstants.LOG_DIR_EXPANSION_VAR;
+ launcher.setEnv("LOG_DIR", logDir);
+ log.info("LOG_DIR set to {}", logDir);
+ if (System.getenv(HADOOP_USER_NAME) != null) {
+ launcher.setEnv(HADOOP_USER_NAME, System.getenv(HADOOP_USER_NAME));
+ }
+ //add english env
+ launcher.setEnv("LANG", "en_US.UTF-8");
+ launcher.setEnv("LC_ALL", "en_US.UTF-8");
+ launcher.setEnv("LANGUAGE", "en_US.UTF-8");
+
+ //local resources
+ providerUtils.localizePackages(launcher, fileSystem, appConf, roleGroup,
+ getClusterName());
+
+ if (SliderUtils.isHadoopClusterSecure(getConfig())) {
+ providerUtils.localizeServiceKeytabs(launcher, instanceDefinition,
+ fileSystem, getClusterName());
+ }
+
+ if (providerUtils.areStoresRequested(appComponent)) {
+ providerUtils.localizeContainerSecurityStores(launcher, container,
+ roleName, fileSystem, instanceDefinition, appComponent, getClusterName());
+ }
+
+ if (appComponent.getOptionBool(AM_CONFIG_GENERATION, false)) {
+ // build and localize configuration files
+ Map<String, Map<String, String>> configurations =
+ providerUtils.buildConfigurations(
+ instanceDefinition.getAppConfOperations(),
+ instanceDefinition.getInternalOperations(),
+ container.getId().toString(), roleName, roleGroup,
+ getAmState());
+ providerUtils.localizeConfigFiles(launcher, roleName, roleGroup,
+ appConf, configurations, launcher.getEnv(), fileSystem,
+ getClusterName());
+ }
+
+ //add the configuration resources
+ launcher.addLocalResources(fileSystem.submitDirectory(
+ generatedConfPath,
+ PROPAGATED_CONF_DIR_NAME));
+
+ CommandLineBuilder operation = new CommandLineBuilder();
+ operation.add(appConf.getComponentOpt(roleGroup, DOCKER_START_COMMAND,
+ "/bin/bash"));
+
+ operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
+ + OUT_FILE + " 2>" + ERR_FILE);
+
+ launcher.addCommand(operation.build());
+
+ // Additional files to localize
+ String appResourcesString = instanceDefinition.getAppConfOperations()
+ .getGlobalOptions().getOption(APP_RESOURCES, null);
+ log.info("Configuration value for extra resources to localize: {}", appResourcesString);
+ if (null != appResourcesString) {
+ try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) {
+ while (scanner.hasNext()) {
+ String resource = scanner.next();
+ Path resourcePath = new Path(resource);
+ LocalResource extraResource = fileSystem.createAmResource(
+ fileSystem.getFileSystem().resolvePath(resourcePath),
+ LocalResourceType.FILE);
+ String destination = APP_RESOURCES_DIR + "/" + resourcePath.getName();
+ log.info("Localizing {} to {}", resourcePath, destination);
+ // TODO Can we try harder to avoid collisions?
+ launcher.addLocalResource(destination, extraResource);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initializeApplicationConfiguration(
+ AggregateConf instanceDefinition, SliderFileSystem fileSystem,
+ String roleGroup)
+ throws IOException, SliderException {
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public void applyInitialRegistryDefinitions(URL amWebURI,
+ ServiceRecord serviceRecord)
+ throws IOException {
+ super.applyInitialRegistryDefinitions(amWebURI, serviceRecord);
+
+ // identify client component
+ String clientName = null;
+ ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
+ for (String component : appConf.getComponentNames()) {
+ if (COMPONENT_TYPE_CLIENT.equals(appConf.getComponentOpt(component,
+ COMPONENT_TYPE_KEY, null))) {
+ clientName = component;
+ break;
+ }
+ }
+ if (clientName == null) {
+ log.info("No client component specified, not publishing client configs");
+ return;
+ }
+
+ // register AM-generated client configs
+ // appConf should already be resolved!
+ MapOperations clientOperations = appConf.getComponent(clientName);
+ if (!clientOperations.getOptionBool(AM_CONFIG_GENERATION, false)) {
+ log.info("AM config generation is false, not publishing client configs");
+ return;
+ }
+
+ // build and localize configuration files
+ Map<String, Map<String, String>> configurations =
+ providerUtils.buildConfigurations(appConf, getAmState()
+ .getInternalsSnapshot(), null, clientName, clientName,
+ getAmState());
+
+ for (String configFileDN : configurations.keySet()) {
+ String configFileName = appConf.getComponentOpt(clientName,
+ OptionKeys.CONF_FILE_PREFIX + configFileDN + OptionKeys
+ .NAME_SUFFIX, null);
+ String configFileType = appConf.getComponentOpt(clientName,
+ OptionKeys.CONF_FILE_PREFIX + configFileDN + OptionKeys
+ .TYPE_SUFFIX, null);
+ if (configFileName == null && configFileType == null) {
+ continue;
+ }
+ ConfigFormat configFormat = ConfigFormat.resolve(configFileType);
+
+ Map<String, String> config = configurations.get(configFileDN);
+ ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
+ fileSystem, getClusterName(),
+ new File(configFileName).getName());
+ providerUtils.publishApplicationInstanceData(configFileDN, configFileDN,
+ config.entrySet(), getAmState());
+ }
+ }
+
+ @Override
+ public boolean processContainerStatus(ContainerId containerId,
+ ContainerStatus status) {
+ log.debug("Handling container status: {}", status);
+ if (SliderUtils.isEmpty(status.getIPs()) ||
+ SliderUtils.isUnset(status.getHost())) {
+ return true;
+ }
+ RoleInstance instance = getAmState().getOwnedContainer(containerId);
+ if (instance == null) {
+ // container is completed?
+ return false;
+ }
+
+ String roleName = instance.role;
+ String roleGroup = instance.group;
+ String containerIdStr = containerId.toString();
+
+ providerUtils.updateServiceRecord(getAmState(), yarnRegistry,
+ containerIdStr, roleName, status.getIPs(), status.getHost());
+
+ publishExportGroups(containerIdStr, roleName, roleGroup,
+ status.getHost());
+ return false;
+ }
+
+ /**
+ * This method looks for configuration properties of the form
+ * export.key,value and publishes the key,value pair. Standard tokens are
+ * substituted into the value, and COMPONENTNAME_HOST and THIS_HOST tokens
+ * are substituted with the actual hostnames of the containers.
+ */
+ protected void publishExportGroups(String containerId,
+ String roleName, String roleGroup, String thisHost) {
+ ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
+ ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
+
+ Map<String, String> exports = providerUtils.getExports(
+ getAmState().getAppConfSnapshot(), roleGroup);
+
+ String hostKeyFormat = "${%s_HOST}";
+
+ // publish export groups if any
+ Map<String, String> replaceTokens =
+ providerUtils.filterSiteOptions(
+ appConf.getComponent(roleGroup).options,
+ providerUtils.getStandardTokenMap(appConf, internalsConf, roleName,
+ roleGroup, containerId, getClusterName()));
+ for (Map.Entry<String, Map<String, ClusterNode>> entry :
+ getAmState().getRoleClusterNodeMapping().entrySet()) {
+ String hostName = providerUtils.getHostsList(
+ entry.getValue().values(), true).iterator().next();
+ replaceTokens.put(String.format(hostKeyFormat,
+ entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
+ }
+ replaceTokens.put("${THIS_HOST}", thisHost);
+
+ Map<String, List<ExportEntry>> entries = new HashMap<>();
+ for (Entry<String, String> export : exports.entrySet()) {
+ String value = export.getValue();
+ // replace host names and site properties
+ for (String token : replaceTokens.keySet()) {
+ if (value.contains(token)) {
+ value = value.replace(token, replaceTokens.get(token));
+ }
+ }
+ ExportEntry entry = new ExportEntry();
+ entry.setLevel(APPLICATION_TAG);
+ entry.setValue(value);
+ entry.setUpdatedTime(new Date().toString());
+ // over-write, app exports are singletons
+ entries.put(export.getKey(), new ArrayList(Arrays.asList(entry)));
+ log.info("Preparing to publish. Key {} and Value {}",
+ export.getKey(), value);
+ }
+ providerUtils.publishExportGroup(entries, getAmState(), EXPORT_GROUP);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 67d3647..c021b80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -103,13 +103,9 @@ public class SliderAMProviderService extends AbstractProviderService implements
@Override
public void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException {
super.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
serviceRecord);
// now publish site.xml files
YarnConfiguration defaultYarnConfig = new YarnConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 983b5ba..57ec218 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -784,8 +784,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
contentCache);
initAMFilterOptions(serviceConf);
- // start the agent web app
- startAgentWebApp(appInformation, serviceConf, webAppApi);
+ if (providerService instanceof AgentProviderService) {
+ // start the agent web app
+ startAgentWebApp(appInformation, serviceConf, webAppApi);
+ }
int webAppPort = deployWebApplication(webAppApi);
String scheme = WebAppUtils.HTTP_PREFIX;
@@ -1296,8 +1298,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// the registry is running, so register services
URL amWebURI = new URL(appMasterProxiedUrl);
- URL agentOpsURI = new URL(agentOpsUrl);
- URL agentStatusURI = new URL(agentStatusUrl);
//Give the provider restricted access to the state, registry
setupInitialRegistryPaths();
@@ -1324,15 +1324,20 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// internal services
sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
serviceRecord);
// provider service dynamic definitions.
- providerService.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
- serviceRecord);
+ if (providerService instanceof AgentProviderService) {
+ URL agentOpsURI = new URL(agentOpsUrl);
+ URL agentStatusURI = new URL(agentStatusUrl);
+ ((AgentProviderService)providerService).applyInitialRegistryDefinitions(
+ amWebURI,
+ agentOpsURI,
+ agentStatusURI,
+ serviceRecord);
+ } else {
+ providerService.applyInitialRegistryDefinitions(amWebURI, serviceRecord);
+ }
// set any provided attributes
setProvidedServiceRecordAttributes(
@@ -2285,6 +2290,20 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
ContainerStatus containerStatus) {
LOG_YARN.debug("Container Status: id={}, status={}", containerId,
containerStatus);
+ if (providerService.processContainerStatus(containerId, containerStatus)) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ RoleInstance cinfo = appState.getOwnedContainer(containerId);
+ if (cinfo != null) {
+ LOG_YARN.info("Re-requesting status for role {}, {}",
+ cinfo.role, containerId);
+ //trigger another async container status
+ nmClientAsync.getContainerStatusAsync(containerId,
+ cinfo.container.getNodeId());
+ }
+ }
}
@Override // NMClientAsync.CallbackHandler
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/resources/org/apache/slider/slider.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/resources/org/apache/slider/slider.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/resources/org/apache/slider/slider.xml
index 37ac65c..a2517d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/resources/org/apache/slider/slider.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/resources/org/apache/slider/slider.xml
@@ -27,4 +27,8 @@
<name>slider.provider.agent</name>
<value>org.apache.slider.providers.agent.AgentProviderFactory</value>
</property>
+ <property>
+ <name>slider.provider.docker</name>
+ <value>org.apache.slider.providers.docker.DockerProviderFactory</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/appConfig.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/appConfig.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/appConfig.json
new file mode 100644
index 0000000..c87f77c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/appConfig.json
@@ -0,0 +1,42 @@
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {},
+ "global": {
+ "am.config.generation": "true",
+ "component.unique.names": "true",
+
+ "export.app.monitor": "${COMPONENT1_HOST} : ${@//site/test-xml/xmlkey}",
+ "export.other.key": "exportvalue",
+
+ "docker.image": "docker.io/centos:centos6",
+ "docker.startCommand": "sleep 600",
+
+ "conf.test-json.type": "json",
+ "conf.test-json.name": "/tmp/test.json",
+ "conf.test-xml.type": "xml",
+ "conf.test-xml.name": "/tmp/test.xml",
+ "conf.test-properties.type": "properties",
+ "conf.test-properties.name": "/tmp/test.xml",
+ "conf.test-yaml.type": "yaml",
+ "conf.test-yaml.name": "/tmp/test.yaml",
+ "conf.test-env.type": "env",
+ "conf.test-env.name": "/tmp/testenv",
+ "conf.test-template.type": "template",
+ "conf.test-template.name": "/tmp/test.template",
+ "conf.test-hadoop-xml.type": "hadoop-xml",
+ "conf.test-hadoop-xml.name": "/tmp/test-hadoop.xml",
+
+ "site.test-json.jsonkey": "val1",
+ "site.test-xml.xmlkey": "val2",
+ "site.test-hadoop-xml.xmlkey": "val3",
+ "site.test-properties.propkey": "val4",
+ "site.test-yaml.yamlkey": "val5",
+ "site.test-env.content": "test ${envkey1} {{envkey2}} content",
+ "site.test-env.envkey1": "envval1",
+ "site.test-env.envkey2": "envval2",
+ "site.test-template.templatekey1": "templateval1",
+ "site.test-template.templatekey2": "templateval2"
+ },
+ "components": {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/resources.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/resources.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/resources.json
new file mode 100644
index 0000000..1b06224
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/resources.json
@@ -0,0 +1,16 @@
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {},
+ "global": {},
+ "components": {
+ "slider-appmaster": {
+ "yarn.memory": "384"
+ },
+ "COMPONENT": {
+ "yarn.role.priority": "1",
+ "yarn.component.instances": 2,
+ "yarn.memory": "512",
+ "yarn.vcores": "2"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/test.template
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/test.template b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/test.template
new file mode 100644
index 0000000..2922655
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/resources/org/apache/slider/providers/docker/test.template
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+test ${templatekey1} {{templatekey2}} content
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: YARN-5505. Create an agent-less docker
provider in the native-services framework. Contributed by Billie Rinaldi
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
index 8203cf0..fdc5be1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
@@ -82,6 +82,8 @@ import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import static org.apache.slider.common.tools.SliderUtils.getApplicationDefinitionPath;
+
/** This class implements the client-side aspects of the agent deployer */
public class AgentClientProvider extends AbstractClientProvider
implements AgentKeys, SliderKeys {
@@ -132,13 +134,13 @@ public class AgentClientProvider extends AbstractClientProvider
sliderFileSystem.verifyFileExists(appDefPath);
String agentConf = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_CONF, "");
+ getGlobalOptions().getOption(AGENT_CONF, "");
if (StringUtils.isNotEmpty(agentConf)) {
sliderFileSystem.verifyFileExists(new Path(agentConf));
}
String appHome = instanceDefinition.getAppConfOperations().
- getGlobalOptions().get(AgentKeys.PACKAGE_PATH);
+ getGlobalOptions().get(PACKAGE_PATH);
if (SliderUtils.isUnset(appHome)) {
String agentImage = instanceDefinition.getInternalOperations().
get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
@@ -173,7 +175,7 @@ public class AgentClientProvider extends AbstractClientProvider
}
Set<String> names = resources.getComponentNames();
- names.remove(SliderKeys.COMPONENT_AM);
+ names.remove(COMPONENT_AM);
Map<Integer, String> priorityMap = new HashMap<Integer, String>();
for (String name : names) {
@@ -271,7 +273,7 @@ public class AgentClientProvider extends AbstractClientProvider
String agentImage = instanceDefinition.getInternalOperations().
get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
if (SliderUtils.isUnset(agentImage)) {
- Path agentPath = new Path(tempPath.getParent(), AgentKeys.PROVIDER_AGENT);
+ Path agentPath = new Path(tempPath.getParent(), PROVIDER_AGENT);
log.info("Automatically uploading the agent tarball at {}", agentPath);
fileSystem.getFileSystem().mkdirs(agentPath);
if (ProviderUtils.addAgentTar(this, AGENT_TAR, fileSystem, agentPath)) {
@@ -284,6 +286,12 @@ public class AgentClientProvider extends AbstractClientProvider
@Override
public Set<String> getApplicationTags(SliderFileSystem fileSystem,
+ ConfTreeOperations appConf) throws SliderException {
+ return getApplicationTags(fileSystem,
+ getApplicationDefinitionPath(appConf));
+ }
+
+ public Set<String> getApplicationTags(SliderFileSystem fileSystem,
String appDef) throws SliderException {
Set<String> tags;
Metainfo metaInfo = getMetainfo(fileSystem, appDef);
@@ -437,19 +445,19 @@ public class AgentClientProvider extends AbstractClientProvider
if (config != null) {
try {
clientRoot = config.getJSONObject("global")
- .getString(AgentKeys.APP_CLIENT_ROOT);
+ .getString(APP_CLIENT_ROOT);
} catch (JSONException e) {
log.info("Couldn't read {} from provided client config, falling " +
- "back on default", AgentKeys.APP_CLIENT_ROOT);
+ "back on default", APP_CLIENT_ROOT);
}
}
if (clientRoot == null && defaultConfig != null) {
try {
clientRoot = defaultConfig.getJSONObject("global")
- .getString(AgentKeys.APP_CLIENT_ROOT);
+ .getString(APP_CLIENT_ROOT);
} catch (JSONException e) {
log.info("Couldn't read {} from default client config, using {}",
- AgentKeys.APP_CLIENT_ROOT, clientInstallPath);
+ APP_CLIENT_ROOT, clientInstallPath);
}
}
if (clientRoot == null) {
@@ -500,7 +508,7 @@ public class AgentClientProvider extends AbstractClientProvider
try {
String clientScriptPath = appPkgDir.getAbsolutePath() + File.separator + "package" +
File.separator + clientScript;
- List<String> command = Arrays.asList(AgentKeys.PYTHON_EXE,
+ List<String> command = Arrays.asList(PYTHON_EXE,
"-S",
clientScriptPath,
"INSTALL",
@@ -510,12 +518,12 @@ public class AgentClientProvider extends AbstractClientProvider
"DEBUG");
ProcessBuilder pb = new ProcessBuilder(command);
log.info("Command: " + StringUtils.join(pb.command(), " "));
- pb.environment().put(SliderKeys.PYTHONPATH,
+ pb.environment().put(PYTHONPATH,
agentPkgDir.getAbsolutePath()
+ File.separator + "slider-agent" + File.pathSeparator
+ agentPkgDir.getAbsolutePath()
+ File.separator + "slider-agent/jinja2");
- log.info("{}={}", SliderKeys.PYTHONPATH, pb.environment().get(SliderKeys.PYTHONPATH));
+ log.info("{}={}", PYTHONPATH, pb.environment().get(PYTHONPATH));
Process proc = pb.start();
InputStream stderr = proc.getErrorStream();
@@ -555,8 +563,8 @@ public class AgentClientProvider extends AbstractClientProvider
private void expandAgentTar(File agentPkgDir) throws IOException {
String libDirProp =
- System.getProperty(SliderKeys.PROPERTY_LIB_DIR);
- File tarFile = new File(libDirProp, SliderKeys.AGENT_TAR);
+ System.getProperty(PROPERTY_LIB_DIR);
+ File tarFile = new File(libDirProp, AGENT_TAR);
expandTar(tarFile, agentPkgDir);
}
@@ -620,7 +628,7 @@ public class AgentClientProvider extends AbstractClientProvider
String name) throws SliderException {
try {
JSONObject pkgList = new JSONObject();
- pkgList.put(AgentKeys.PACKAGE_LIST,
+ pkgList.put(PACKAGE_LIST,
AgentProviderService.getPackageListFromApplication(metainfo.getApplication()));
JSONObject obj = new JSONObject();
obj.put("hostLevelParams", pkgList);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 9ea984c..c4228e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -23,6 +23,7 @@ package org.apache.slider.providers.agent;
*/
public interface AgentKeys {
+ String AGENT_TAR = "slider-agent.tar.gz";
String PROVIDER_AGENT = "agent";
String ROLE_NODE = "echo";
@@ -76,23 +77,13 @@ public interface AgentKeys {
String AGENT_CONF = "agent.conf";
String ADDON_FOR_ALL_COMPONENTS = "ALL";
- String APP_RESOURCES = "application.resources";
- String APP_RESOURCES_DIR = "app/resources";
-
- String APP_CONF_DIR = "app/conf";
-
String AGENT_INSTALL_DIR = "infra/agent";
String APP_DEFINITION_DIR = "app/definition";
String ADDON_DEFINITION_DIR = "addon/definition";
String AGENT_CONFIG_FILE = "infra/conf/agent.ini";
String AGENT_VERSION_FILE = "infra/version";
- String APP_PACKAGES_DIR = "app/packages";
- String PER_COMPONENT = "per.component";
- String PER_GROUP = "per.group";
- String JAVA_HOME = "java_home";
String PACKAGE_LIST = "package_list";
- String SYSTEM_CONFIGS = "system_configs";
String WAIT_HEARTBEAT = "wait.heartbeat";
String PYTHON_EXE = "python";
String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node";
@@ -104,7 +95,6 @@ public interface AgentKeys {
String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt";
String KEY_CONTAINER_LAUNCH_DELAY = "container.launch.delay.sec";
String TEST_RELAX_VERIFICATION = "test.relax.validation";
- String AM_CONFIG_GENERATION = "am.config.generation";
String DEFAULT_METAINFO_MAP_KEY = "DEFAULT_KEY";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 2ab5c6f..499812e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -21,20 +21,11 @@ package org.apache.slider.providers.agent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
@@ -55,7 +46,6 @@ import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderException;
@@ -65,7 +55,6 @@ import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.ConfigUtils;
import org.apache.slider.core.registry.docstore.ExportEntry;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.AbstractProviderService;
@@ -107,20 +96,15 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
-import org.apache.slider.server.services.security.CertificateManager;
-import org.apache.slider.server.services.security.SecurityStore;
-import org.apache.slider.server.services.security.StoresGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -157,10 +141,6 @@ public class AgentProviderService extends AbstractProviderService implements
private static final String LABEL_MAKER = "___";
private static final String CONTAINER_ID = "container_id";
private static final String GLOBAL_CONFIG_TAG = "global";
- private static final String LOG_FOLDERS_TAG = "LogFolders";
- private static final String HOST_FOLDER_FORMAT = "%s:%s";
- private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
- private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
private static final String COMPONENT_TAG = "component";
private static final String APPLICATION_TAG = "application";
private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
@@ -249,13 +229,6 @@ public class AgentProviderService extends AbstractProviderService implements
}
@Override
- public Configuration loadProviderConfigurationInformation(File confDir) throws
- BadCommandArgumentsException,
- IOException {
- return new Configuration(false);
- }
-
- @Override
public void validateInstanceDefinition(AggregateConf instanceDefinition)
throws
SliderException {
@@ -265,7 +238,7 @@ public class AgentProviderService extends AbstractProviderService implements
instanceDefinition.getResourceOperations();
Set<String> names = resources.getComponentNames();
- names.remove(SliderKeys.COMPONENT_AM);
+ names.remove(COMPONENT_AM);
for (String name : names) {
Component componentDef = getApplicationComponent(name);
if (componentDef == null) {
@@ -350,9 +323,9 @@ public class AgentProviderService extends AbstractProviderService implements
// build a map from component to metainfo
String addonAppDefString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.ADDONS, null);
+ .getGlobalOptions().getOption(ADDONS, null);
if (component != null) {
- addonAppDefString = component.getOption(AgentKeys.ADDONS, addonAppDefString);
+ addonAppDefString = component.getOption(ADDONS, addonAppDefString);
}
log.debug("All addon appdefs: {}", addonAppDefString);
if (addonAppDefString != null) {
@@ -415,6 +388,7 @@ public class AgentProviderService extends AbstractProviderService implements
if (isYarnDockerContainer(roleGroup)) {
launcher.setYarnDockerMode(true);
launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image"));
+ launcher.setDockerNetwork(getConfigFromMetaInfo(roleGroup, "network"));
launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer"));
launcher
.setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding(
@@ -423,7 +397,9 @@ public class AgentProviderService extends AbstractProviderService implements
// Set the environment
launcher.putEnv(SliderUtils.buildEnvMap(appComponent,
- getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup)));
+ providerUtils.getStandardTokenMap(getAmState().getAppConfSnapshot(),
+ getAmState().getInternalsSnapshot(), roleName, roleGroup,
+ getClusterName())));
String workDir = ApplicationConstants.Environment.PWD.$();
launcher.setEnv("AGENT_WORK_ROOT", workDir);
@@ -444,17 +420,17 @@ public class AgentProviderService extends AbstractProviderService implements
//local resources
// TODO: Should agent need to support App Home
- String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
+ String scriptPath = new File(AGENT_MAIN_SCRIPT_ROOT, AGENT_MAIN_SCRIPT).getPath();
String appHome = instanceDefinition.getAppConfOperations().
- getGlobalOptions().get(AgentKeys.PACKAGE_PATH);
+ getGlobalOptions().get(PACKAGE_PATH);
if (SliderUtils.isSet(appHome)) {
- scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
+ scriptPath = new File(appHome, AGENT_MAIN_SCRIPT).getPath();
}
// set PYTHONPATH
List<String> pythonPaths = new ArrayList<String>();
- pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT);
- pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT);
+ pythonPaths.add(AGENT_MAIN_SCRIPT_ROOT);
+ pythonPaths.add(AGENT_JINJA2_ROOT);
String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths);
launcher.setEnv(PYTHONPATH, pythonPath);
log.info("PYTHONPATH set to {}", pythonPath);
@@ -466,21 +442,21 @@ public class AgentProviderService extends AbstractProviderService implements
agentImagePath =
new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR),
container.getId().getApplicationAttemptId().getApplicationId().toString()),
- AgentKeys.PROVIDER_AGENT),
- SliderKeys.AGENT_TAR);
+ PROVIDER_AGENT),
+ AGENT_TAR);
} else {
agentImagePath = new Path(agentImage);
}
if (fileSystem.getFileSystem().exists(agentImagePath)) {
LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes);
+ launcher.addLocalResource(AGENT_INSTALL_DIR, agentImageRes);
} else {
String msg =
String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString());
MapOperations compOps = appComponent;
boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps.
- getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false;
+ getOptionBool(TEST_RELAX_VERIFICATION, false)) : false;
log.error(msg);
if (!relaxVerificationForTest) {
@@ -492,7 +468,7 @@ public class AgentProviderService extends AbstractProviderService implements
LocalResource appDefRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(appDef)),
LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes);
+ launcher.addLocalResource(APP_DEFINITION_DIR, appDefRes);
for (Package pkg : getMetaInfo(roleGroup).getApplication().getPackages()) {
Path pkgPath = fileSystem.buildResourcePath(pkg.getName());
@@ -511,57 +487,61 @@ public class AgentProviderService extends AbstractProviderService implements
}
LocalResource packageResource = fileSystem.createAmResource(
pkgPath, type);
- launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource);
+ launcher.addLocalResource(APP_PACKAGES_DIR, packageResource);
}
String agentConf = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_CONF, "");
+ getGlobalOptions().getOption(AGENT_CONF, "");
if (SliderUtils.isSet(agentConf)) {
LocalResource agentConfRes = fileSystem.createAmResource(fileSystem
.getFileSystem().resolvePath(new Path(agentConf)),
LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes);
+ launcher.addLocalResource(AGENT_CONFIG_FILE, agentConfRes);
}
String agentVer = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null);
+ getGlobalOptions().getOption(AGENT_VERSION, null);
if (agentVer != null) {
LocalResource agentVerRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(agentVer)),
LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes);
+ launcher.addLocalResource(AGENT_VERSION_FILE, agentVerRes);
}
if (SliderUtils.isHadoopClusterSecure(getConfig())) {
- localizeServiceKeytabs(launcher, instanceDefinition, fileSystem);
+ providerUtils.localizeServiceKeytabs(launcher, instanceDefinition,
+ fileSystem, getClusterName());
}
MapOperations amComponent = instanceDefinition.
- getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
- boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent.
- getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false;
- if (twoWayEnabled) {
- localizeContainerSSLResources(launcher, container, fileSystem);
+ getAppConfOperations().getComponent(COMPONENT_AM);
+ if (providerUtils.hasTwoWaySSLEnabled(amComponent)) {
+ providerUtils.localizeContainerSSLResources(launcher, container,
+ fileSystem, getClusterName());
}
- MapOperations compOps = appComponent;
- if (areStoresRequested(compOps)) {
- localizeContainerSecurityStores(launcher, container, roleName, fileSystem,
- instanceDefinition, compOps);
+ if (providerUtils.areStoresRequested(appComponent)) {
+ providerUtils.localizeContainerSecurityStores(launcher, container,
+ roleName, fileSystem, instanceDefinition, appComponent,
+ getClusterName());
}
//add the configuration resources
launcher.addLocalResources(fileSystem.submitDirectory(
generatedConfPath,
- SliderKeys.PROPAGATED_CONF_DIR_NAME));
+ PROPAGATED_CONF_DIR_NAME));
- if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) {
+ if (appComponent.getOptionBool(AM_CONFIG_GENERATION, false)) {
// build and localize configuration files
Map<String, Map<String, String>> configurations =
buildCommandConfigurations(instanceDefinition.getAppConfOperations(),
+ instanceDefinition.getInternalOperations(),
container.getId().toString(), roleName, roleGroup);
- localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(roleGroup),
- configurations, launcher.getEnv(), fileSystem);
+ for (ConfigFile configFile : getMetaInfo(roleGroup)
+ .getComponentConfigFiles(roleGroup)) {
+ localizeConfigFile(launcher, roleName, roleGroup, configFile,
+ configurations, launcher.getEnv(), fileSystem);
+ }
}
String label = getContainerLabel(container, roleName, roleGroup);
@@ -569,7 +549,7 @@ public class AgentProviderService extends AbstractProviderService implements
String pythonExec = instanceDefinition.getAppConfOperations()
.getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH,
- AgentKeys.PYTHON_EXE);
+ PYTHON_EXE);
operation.add(pythonExec);
@@ -587,13 +567,13 @@ public class AgentProviderService extends AbstractProviderService implements
}
operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
- + AgentKeys.AGENT_OUT_FILE + " 2>&1");
+ + AGENT_OUT_FILE + " 2>&1");
launcher.addCommand(operation.build());
// localize addon package
String addonAppDefString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.ADDONS, null);
+ .getGlobalOptions().getOption(ADDONS, null);
log.debug("All addon appdefs: {}", addonAppDefString);
if (addonAppDefString != null) {
Scanner scanner = new Scanner(addonAppDefString).useDelimiter(",");
@@ -605,7 +585,7 @@ public class AgentProviderService extends AbstractProviderService implements
LocalResource addonPkgRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)),
LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes);
+ launcher.addLocalResource(ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes);
}
log.debug("Metainfo map for master and addon: {}",
packageMetainfo.toString());
@@ -613,7 +593,7 @@ public class AgentProviderService extends AbstractProviderService implements
// Additional files to localize in addition to the application def
String appResourcesString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null);
+ .getGlobalOptions().getOption(APP_RESOURCES, null);
log.info("Configuration value for extra resources to localize: {}", appResourcesString);
if (null != appResourcesString) {
try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) {
@@ -623,7 +603,7 @@ public class AgentProviderService extends AbstractProviderService implements
LocalResource extraResource = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(resourcePath),
LocalResourceType.FILE);
- String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName();
+ String destination = APP_RESOURCES_DIR + "/" + resourcePath.getName();
log.info("Localizing {} to {}", resourcePath, destination);
// TODO Can we try harder to avoid collisions?
launcher.addLocalResource(destination, extraResource);
@@ -641,7 +621,7 @@ public class AgentProviderService extends AbstractProviderService implements
log.debug("Current component: {} component in metainfo: {}", roleName,
comp.getName());
if (comp.getName().equals(roleGroup)
- || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
+ || comp.getName().equals(ADDON_FOR_ALL_COMPONENTS)) {
pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT);
}
}
@@ -658,253 +638,18 @@ public class AgentProviderService extends AbstractProviderService implements
pkgStatuses));
}
- private void localizeContainerSecurityStores(ContainerLauncher launcher,
- Container container,
- String role,
- SliderFileSystem fileSystem,
- AggregateConf instanceDefinition,
- MapOperations compOps)
- throws SliderException, IOException {
- // substitute CLUSTER_NAME into credentials
- Map<String,List<String>> newcred = new HashMap<>();
- for (Entry<String,List<String>> entry : instanceDefinition.getAppConf().credentials.entrySet()) {
- List<String> resultList = new ArrayList<>();
- for (String v : entry.getValue()) {
- resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"),
- clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
- clusterName));
- }
- newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"),
- clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
- clusterName),
- resultList);
- }
- instanceDefinition.getAppConf().credentials = newcred;
-
- // generate and localize security stores
- SecurityStore[] stores = generateSecurityStores(container, role,
- instanceDefinition, compOps);
- for (SecurityStore store : stores) {
- LocalResource keystoreResource = fileSystem.createAmResource(
- uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE);
- launcher.addLocalResource(String.format("secstores/%s-%s.p12",
- store.getType(), role),
- keystoreResource);
- }
- }
-
- private SecurityStore[] generateSecurityStores(Container container,
- String role,
- AggregateConf instanceDefinition,
- MapOperations compOps)
- throws SliderException, IOException {
- return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(),
- container.getId().toString(), role,
- instanceDefinition, compOps);
- }
-
- private boolean areStoresRequested(MapOperations compOps) {
- return compOps != null ? compOps.
- getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false;
- }
-
- private void localizeContainerSSLResources(ContainerLauncher launcher,
- Container container,
- SliderFileSystem fileSystem)
- throws SliderException {
- try {
- // localize server cert
- Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
- LocalResource certResource = fileSystem.createAmResource(
- new Path(certsDir, SliderKeys.CRT_FILE_NAME),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH,
- certResource);
-
- // generate and localize agent cert
- CertificateManager certMgr = new CertificateManager();
- String hostname = container.getNodeId().getHost();
- String containerId = container.getId().toString();
- certMgr.generateContainerCertificate(hostname, containerId);
- LocalResource agentCertResource = fileSystem.createAmResource(
- uploadSecurityResource(
- CertificateManager.getAgentCertficateFilePath(containerId),
- fileSystem), LocalResourceType.FILE);
- // still using hostname as file name on the agent side, but the files
- // do end up under the specific container's file space
- launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
- ".crt", agentCertResource);
- LocalResource agentKeyResource = fileSystem.createAmResource(
- uploadSecurityResource(
- CertificateManager.getAgentKeyFilePath(containerId), fileSystem),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
- ".key", agentKeyResource);
-
- } catch (Exception e) {
- throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e,
- "Unable to localize certificates. Two-way SSL cannot be enabled");
- }
- }
-
- private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem)
- throws IOException {
- Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
- return uploadResource(resource, fileSystem, certsDir);
- }
-
- private Path uploadResource(File resource, SliderFileSystem fileSystem,
- String roleName) throws IOException {
- Path dir;
- if (roleName == null) {
- dir = fileSystem.buildClusterResourcePath(getClusterName());
- } else {
- dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName);
- }
- return uploadResource(resource, fileSystem, dir);
- }
-
- private static synchronized Path uploadResource(File resource,
- SliderFileSystem fileSystem, Path parentDir) throws IOException {
- if (!fileSystem.getFileSystem().exists(parentDir)) {
- fileSystem.getFileSystem().mkdirs(parentDir,
- new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- }
- Path destPath = new Path(parentDir, resource.getName());
- if (!fileSystem.getFileSystem().exists(destPath)) {
- FSDataOutputStream os = null;
- try {
- os = fileSystem.getFileSystem().create(destPath);
- byte[] contents = FileUtils.readFileToByteArray(resource);
- os.write(contents, 0, contents.length);
- os.flush();
- } finally {
- IOUtils.closeStream(os);
- }
- log.info("Uploaded {} to localization path {}", resource, destPath);
- } else {
- log.info("Resource {} already existed at localization path {}", resource,
- destPath);
- }
-
- while (!fileSystem.getFileSystem().exists(destPath)) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- fileSystem.getFileSystem().setPermission(destPath,
- new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
-
- return destPath;
- }
-
- private void localizeServiceKeytabs(ContainerLauncher launcher,
- AggregateConf instanceDefinition,
- SliderFileSystem fileSystem)
- throws IOException {
- String keytabPathOnHost = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
- if (SliderUtils.isUnset(keytabPathOnHost)) {
- String amKeytabName = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
- String keytabDir = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
- // we need to localize the keytab files in the directory
- Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
- getClusterName());
- boolean serviceKeytabsDeployed = false;
- if (fileSystem.getFileSystem().exists(keytabDirPath)) {
- FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath);
- LocalResource keytabRes;
- for (FileStatus keytab : keytabs) {
- if (!amKeytabName.equals(keytab.getPath().getName())
- && keytab.getPath().getName().endsWith(".keytab")) {
- serviceKeytabsDeployed = true;
- log.info("Localizing keytab {}", keytab.getPath().getName());
- keytabRes = fileSystem.createAmResource(keytab.getPath(),
- LocalResourceType.FILE);
- launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" +
- keytab.getPath().getName(),
- keytabRes);
- }
- }
- }
- if (!serviceKeytabsDeployed) {
- log.warn("No service keytabs for the application have been localized. "
- + "If the application requires keytabs for secure operation, "
- + "please ensure that the required keytabs have been uploaded "
- + "to the folder {}", keytabDirPath);
- }
- }
- }
-
- private void createConfigFile(SliderFileSystem fileSystem, File file,
- ConfigFile configFile, Map<String, String> config)
- throws IOException {
- ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
- log.info("Writing {} file {}", configFormat, file);
-
- ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
- fileSystem, getClusterName(), file.getName());
- PublishedConfiguration publishedConfiguration =
- new PublishedConfiguration(configFile.getDictionaryName(),
- config.entrySet());
- PublishedConfigurationOutputter configurationOutputter =
- PublishedConfigurationOutputter.createOutputter(configFormat,
- publishedConfiguration);
- configurationOutputter.save(file);
- }
-
@VisibleForTesting
- protected void localizeConfigFiles(ContainerLauncher launcher,
+ protected void localizeConfigFile(ContainerLauncher launcher,
String roleName, String roleGroup,
- Metainfo metainfo,
+ ConfigFile configFile,
Map<String, Map<String, String>> configs,
MapOperations env,
SliderFileSystem fileSystem)
throws IOException {
- for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) {
- Map<String, String> config = ConfigUtils.replacePropsInConfig(
- configs.get(configFile.getDictionaryName()), env.options);
- String fileName = ConfigUtils.replaceProps(config,
- configFile.getFileName());
- File localFile = new File(SliderKeys.RESOURCE_DIR);
- if (!localFile.exists()) {
- localFile.mkdir();
- }
- localFile = new File(localFile, new File(fileName).getName());
-
- String folder = null;
- if ("true".equals(config.get(PER_COMPONENT))) {
- folder = roleName;
- } else if ("true".equals(config.get(PER_GROUP))) {
- folder = roleGroup;
- }
-
- log.info("Localizing {} configs to config file {} (destination {}) " +
- "based on {} configs", config.size(), localFile, fileName,
- configFile.getDictionaryName());
- createConfigFile(fileSystem, localFile, configFile, config);
- Path destPath = uploadResource(localFile, fileSystem, folder);
- LocalResource configResource = fileSystem.createAmResource(destPath,
- LocalResourceType.FILE);
-
- File destFile = new File(fileName);
- if (destFile.isAbsolute()) {
- launcher.addLocalResource(
- SliderKeys.RESOURCE_DIR + "/" + destFile.getName(),
- configResource, fileName);
- } else {
- launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName,
- configResource);
- }
- }
+ ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
+ providerUtils.localizeConfigFile(launcher, roleName, roleGroup,
+ configFile.getDictionaryName(), configFormat, configFile.getFileName(),
+ configs, env, fileSystem, getClusterName());
}
/**
@@ -1053,7 +798,6 @@ public class AgentProviderService extends AbstractProviderService implements
doUpgrade = true;
}
- StateAccessForProviders accessor = getAmState();
CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup);
List<ComponentCommand> commands = getApplicationComponent(roleGroup).getCommands();
@@ -1089,45 +833,10 @@ public class AgentProviderService extends AbstractProviderService implements
List<ComponentStatus> statuses = heartBeat.getComponentStatus();
if (statuses != null && !statuses.isEmpty()) {
log.info("status from agent: " + statuses.toString());
- try {
- for(ComponentStatus status : statuses){
- RoleInstance role = null;
- if(status.getIp() != null && !status.getIp().isEmpty()){
- role = amState.getOwnedContainer(containerId);
- role.ip = status.getIp();
- }
- if(status.getHostname() != null && !status.getHostname().isEmpty()){
- role = amState.getOwnedContainer(containerId);
- role.hostname = status.getHostname();
- }
- if (role != null) {
- // create an updated service record (including hostname and ip) and publish...
- ServiceRecord record = new ServiceRecord();
- record.set(YarnRegistryAttributes.YARN_ID, containerId);
- record.description = roleName;
- record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
- PersistencePolicies.CONTAINER);
- // TODO: switch record attributes to use constants from YarnRegistryAttributes
- // when it's been updated.
- if (role.ip != null) {
- record.set("yarn:ip", role.ip);
- }
- if (role.hostname != null) {
- record.set("yarn:hostname", role.hostname);
- }
- yarnRegistry.putComponent(
- RegistryPathUtils.encodeYarnID(containerId), record);
-
- }
- }
-
-
- } catch (NoSuchNodeException e) {
- // ignore - there is nothing to do if we don't find a container
- log.warn("Owned container {} not found - {}", containerId, e);
- } catch (IOException e) {
- log.warn("Error updating container {} service record in registry",
- containerId, e);
+ for(ComponentStatus status : statuses){
+ providerUtils.updateServiceRecord(getAmState(), yarnRegistry,
+ containerId, roleName, Collections.singletonList(status.getIp()),
+ status.getHostname());
}
}
@@ -1179,8 +888,8 @@ public class AgentProviderService extends AbstractProviderService implements
}
}
- int waitForCount = accessor.getInstanceDefinitionSnapshot().
- getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0);
+ int waitForCount = getAmState().getInstanceDefinitionSnapshot().
+ getAppConfOperations().getComponentOptInt(roleGroup, WAIT_HEARTBEAT, 0);
if (id < waitForCount) {
log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id);
@@ -1223,7 +932,7 @@ public class AgentProviderService extends AbstractProviderService implements
log.debug("Addon component: {} pkg: {} script: {}", comp.getName(),
nextPkgToInstall, comp.getCommandScript().getScript());
if (comp.getName().equals(roleGroup)
- || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
+ || comp.getName().equals(ADDON_FOR_ALL_COMPONENTS)) {
scriptPath = comp.getCommandScript().getScript();
if (scriptPath != null) {
addInstallCommand(roleName, roleGroup, containerId, response,
@@ -1406,15 +1115,12 @@ public class AgentProviderService extends AbstractProviderService implements
return details;
}
- @Override
public void applyInitialRegistryDefinitions(URL amWebURI,
URL agentOpsURI,
URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException {
super.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
serviceRecord);
try {
@@ -1450,20 +1156,17 @@ public class AgentProviderService extends AbstractProviderService implements
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
MapOperations clientOperations = appConf.getOrAddComponent(client.getName());
appConf.resolve();
- if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION,
+ if (!clientOperations.getOptionBool(AM_CONFIG_GENERATION,
false)) {
log.info("AM config generation is false, not publishing client configs");
return;
}
// build and localize configuration files
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
- Map<String, String> tokens = null;
- try {
- tokens = getStandardTokenMap(appConf, client.getName(), client.getName());
- } catch (SliderException e) {
- throw new IOException(e);
- }
+ Map<String, Map<String, String>> configurations = new TreeMap<>();
+ Map<String, String> tokens = providerUtils.getStandardTokenMap(appConf,
+ getAmState().getInternalsSnapshot(), client.getName(),
+ client.getName(), getClusterName());
for (ConfigFile configFile : getMetaInfo().getComponentConfigFiles(client.getName())) {
addNamedConfiguration(configFile.getDictionaryName(),
@@ -1561,7 +1264,7 @@ public class AgentProviderService extends AbstractProviderService implements
*/
private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) {
String hbMonitorInterval = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
+ getGlobalOptions().getOption(HEARTBEAT_MONITOR_INTERVAL,
Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
try {
setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
@@ -1581,7 +1284,7 @@ public class AgentProviderService extends AbstractProviderService implements
*/
private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
String launchParameterStr = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
+ getGlobalOptions().getOption(AGENT_INSTANCE_DEBUG_DATA, "");
agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
}
@@ -1699,20 +1402,11 @@ public class AgentProviderService extends AbstractProviderService implements
return clusterName;
}
- /**
- * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site
- *
- * @param name
- * @param description
- * @param entries
- */
+ @VisibleForTesting
protected void publishApplicationInstanceData(String name, String description,
Iterable<Map.Entry<String, String>> entries) {
- PublishedConfiguration pubconf = new PublishedConfiguration();
- pubconf.description = description;
- pubconf.putValues(entries);
- log.info("publishing {}", pubconf);
- getAmState().getPublishedSliderConfigurations().put(name, pubconf);
+ providerUtils.publishApplicationInstanceData(name, description, entries,
+ getAmState());
}
/**
@@ -1771,72 +1465,14 @@ public class AgentProviderService extends AbstractProviderService implements
return stats;
}
-
- /**
- * Format the folder locations and publish in the registry service
- *
- * @param folders
- * @param containerId
- * @param hostFqdn
- * @param componentName
- */
+ @VisibleForTesting
protected void publishFolderPaths(
Map<String, String> folders, String containerId, String componentName, String hostFqdn) {
- Date now = new Date();
- for (Map.Entry<String, String> entry : folders.entrySet()) {
- ExportEntry exportEntry = new ExportEntry();
- exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue()));
- exportEntry.setContainerId(containerId);
- exportEntry.setLevel(COMPONENT_TAG);
- exportEntry.setTag(componentName);
- exportEntry.setUpdatedTime(now.toString());
- if (entry.getKey().equals("AGENT_LOG_ROOT")) {
- synchronized (logFolderExports) {
- getLogFolderExports().put(containerId, exportEntry);
- }
- } else {
- synchronized (workFolderExports) {
- getWorkFolderExports().put(containerId, exportEntry);
- }
- }
- log.info("Updating log and pwd folders for container {}", containerId);
- }
-
- PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
- exports.setUpdated(now.getTime());
- synchronized (logFolderExports) {
- updateExportsFromList(exports, getLogFolderExports());
- }
- getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
-
- exports = new PublishedExports(CONTAINER_PWDS_TAG);
- exports.setUpdated(now.getTime());
- synchronized (workFolderExports) {
- updateExportsFromList(exports, getWorkFolderExports());
- }
- getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
+ providerUtils.publishFolderPaths(folders, containerId, componentName, hostFqdn,
+ getAmState(), getLogFolderExports(), getWorkFolderExports());
}
/**
- * Update the export data from the map
- * @param exports
- * @param folderExports
- */
- private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) {
- Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>();
- for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet())
- {
- String componentName = logEntry.getValue().getTag();
- if (!perComponentList.containsKey(componentName)) {
- perComponentList.put(componentName, new ArrayList<ExportEntry>());
- }
- perComponentList.get(componentName).add(logEntry.getValue());
- }
- exports.putValues(perComponentList.entrySet());
- }
-
-
- /**
* Process return status for component instances
*
* @param heartBeat
@@ -1855,7 +1491,7 @@ public class AgentProviderService extends AbstractProviderService implements
if ((!canAnyMasterPublishConfig(componentGroup) || canPublishConfig(componentGroup)) &&
!getAmState().getAppConfSnapshot().getComponentOptBool(
- componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) {
+ componentGroup, AM_CONFIG_GENERATION, false)) {
// If no Master can explicitly publish then publish if its a master
// Otherwise, wait till the master that can publish is ready
@@ -1899,7 +1535,8 @@ public class AgentProviderService extends AbstractProviderService implements
// publish export groups if any
Map<String, String> replaceTokens = new HashMap<String, String>();
for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
- String hostName = getHostsList(entry.getValue().values(), true).iterator().next();
+ String hostName = providerUtils.getHostsList(
+ entry.getValue().values(), true).iterator().next();
replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
}
@@ -1967,28 +1604,24 @@ public class AgentProviderService extends AbstractProviderService implements
}
private void publishModifiedExportGroups(Set<String> modifiedGroups) {
- for (String groupName : modifiedGroups) {
- Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName);
-
+ for (String roleGroup : modifiedGroups) {
+ Map<String, List<ExportEntry>> entries = this.exportGroups.get(roleGroup);
// Publish in old format for the time being
Map<String, String> simpleEntries = new HashMap<String, String>();
- for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
+ for (Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
List<ExportEntry> exports = entry.getValue();
if (SliderUtils.isNotEmpty(exports)) {
// there is no support for multiple exports per name - so extract only the first one
simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
}
}
- if (!getAmState().getAppConfSnapshot().getComponentOptBool(
- groupName, AgentKeys.AM_CONFIG_GENERATION, false)) {
- publishApplicationInstanceData(groupName, groupName,
- simpleEntries.entrySet());
- }
+ publishApplicationInstanceData(roleGroup, roleGroup,
+ simpleEntries.entrySet());
- PublishedExports exports = new PublishedExports(groupName);
+ PublishedExports exports = new PublishedExports(roleGroup);
exports.setUpdated(new Date().getTime());
exports.putValues(entries.entrySet());
- getAmState().getPublishedExportsSet().put(groupName, exports);
+ getAmState().getPublishedExportsSet().put(roleGroup, exports);
}
}
@@ -2310,7 +1943,8 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setHostLevelParams(hostLevelParams);
Map<String, Map<String, String>> configurations =
- buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
+ buildCommandConfigurations(appConf, getAmState().getInternalsSnapshot(),
+ containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
@@ -2359,7 +1993,8 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setHostLevelParams(hostLevelParams);
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, getAmState().getInternalsSnapshot(), containerId, roleName,
+ roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
@@ -2522,7 +2157,9 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, false));
- Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
+ Map<String, Map<String, String>> configurations =
+ buildCommandConfigurations(appConf, getAmState().getInternalsSnapshot(),
+ containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
@@ -2557,7 +2194,8 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, getAmState().getInternalsSnapshot(), containerId, roleName,
+ roleGroup);
Map<String, String> dockerConfig = new HashMap<String, String>();
String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand");
if (statusCommand == null) {
@@ -2598,7 +2236,8 @@ public class AgentProviderService extends AbstractProviderService implements
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, getAmState().getInternalsSnapshot(), containerId, roleName,
+ roleGroup);
Map<String, String> dockerConfig = new HashMap<String, String>();
String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand");
if (statusCommand == null) {
@@ -2690,7 +2329,8 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setCommandParams(setCommandParameters(startCommand, timeout, true));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, getAmState().getInternalsSnapshot(), containerId, roleName,
+ roleGroup);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
@@ -2794,13 +2434,6 @@ public class AgentProviderService extends AbstractProviderService implements
result = container.getNetwork();
}
break;
- case "useNetworkScript":
- if (container.getUseNetworkScript() == null || container.getUseNetworkScript().isEmpty()) {
- result = "yes";
- } else {
- result = container.getUseNetworkScript();
- }
- break;
case "statusCommand":
result = container.getStatusCommand();
break;
@@ -2903,7 +2536,9 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setRoleParams(roleParams);
cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart));
- Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
+ Map<String, Map<String, String>> configurations =
+ buildCommandConfigurations(appConf, internalsConf, containerId,
+ roleName, roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
@@ -2957,7 +2592,7 @@ public class AgentProviderService extends AbstractProviderService implements
Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, internalsConf, containerId, roleName, roleGroup);
cmdStop.setConfigurations(configurationsStop);
response.addExecutionCommand(cmdStop);
}
@@ -2989,7 +2624,7 @@ public class AgentProviderService extends AbstractProviderService implements
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, internalsConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
response.addExecutionCommand(cmd);
}
@@ -3023,7 +2658,7 @@ public class AgentProviderService extends AbstractProviderService implements
cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true));
Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations(
- appConf, containerId, roleName, roleGroup);
+ appConf, internalsConf, containerId, roleName, roleGroup);
cmdStop.setConfigurations(configurationsStop);
response.addExecutionCommand(cmdStop);
}
@@ -3062,12 +2697,13 @@ public class AgentProviderService extends AbstractProviderService implements
}
private Map<String, Map<String, String>> buildCommandConfigurations(
- ConfTreeOperations appConf, String containerId, String roleName, String roleGroup)
+ ConfTreeOperations appConf, ConfTreeOperations internalsConf,
+ String containerId, String roleName, String roleGroup)
throws SliderException {
- Map<String, Map<String, String>> configurations =
- new TreeMap<String, Map<String, String>>();
- Map<String, String> tokens = getStandardTokenMap(appConf, roleName, roleGroup);
+ Map<String, Map<String, String>> configurations = new TreeMap<>();
+ Map<String, String> tokens = providerUtils.getStandardTokenMap(appConf,
+ internalsConf, roleName, roleGroup, getClusterName());
tokens.put("${CONTAINER_ID}", containerId);
Set<String> configs = new HashSet<String>();
@@ -3090,111 +2726,16 @@ public class AgentProviderService extends AbstractProviderService implements
return configurations;
}
+ @VisibleForTesting
protected void dereferenceAllConfigs(Map<String, Map<String, String>> configurations) {
- Map<String, String> allConfigs = new HashMap<String, String>();
- String lookupFormat = "${@//site/%s/%s}";
- for (String configType : configurations.keySet()) {
- Map<String, String> configBucket = configurations.get(configType);
- for (String configName : configBucket.keySet()) {
- allConfigs.put(String.format(lookupFormat, configType, configName), configBucket.get(configName));
- }
- }
-
- boolean finished = false;
- while (!finished) {
- finished = true;
- for (Map.Entry<String, String> entry : allConfigs.entrySet()) {
- String configValue = entry.getValue();
- for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
- String lookUpValue = lookUpEntry.getValue();
- if (lookUpValue.contains("${@//site/")) {
- continue;
- }
- String lookUpKey = lookUpEntry.getKey();
- if (configValue != null && configValue.contains(lookUpKey)) {
- configValue = configValue.replace(lookUpKey, lookUpValue);
- }
- }
- if (!configValue.equals(entry.getValue())) {
- finished = false;
- allConfigs.put(entry.getKey(), configValue);
- }
- }
- }
-
- for (String configType : configurations.keySet()) {
- Map<String, String> configBucket = configurations.get(configType);
- for (Map.Entry<String, String> entry: configBucket.entrySet()) {
- String configName = entry.getKey();
- String configValue = entry.getValue();
- for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
- String lookUpValue = lookUpEntry.getValue();
- if (lookUpValue.contains("${@//site/")) {
- continue;
- }
- String lookUpKey = lookUpEntry.getKey();
- if (configValue != null && configValue.contains(lookUpKey)) {
- configValue = configValue.replace(lookUpKey, lookUpValue);
- }
- }
- configBucket.put(configName, configValue);
- }
- }
- }
-
- private Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
- String componentName, String componentGroup) throws SliderException {
- Map<String, String> tokens = new HashMap<String, String>();
- String nnuri = appConf.get("site.fs.defaultFS");
- tokens.put("${NN_URI}", nnuri);
- tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
- tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
- tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
- String prefix = appConf.getComponentOpt(componentGroup, ROLE_PREFIX,
- null);
- String dataDirSuffix = "";
- if (prefix == null) {
- prefix = "";
- } else {
- dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix);
- }
- tokens.put("${DEFAULT_DATA_DIR}", getAmState()
- .getInternalsSnapshot()
- .getGlobalOptions()
- .getMandatoryOption(InternalKeys.INTERNAL_DATA_DIR_PATH) + dataDirSuffix);
- tokens.put("${JAVA_HOME}", appConf.get(AgentKeys.JAVA_HOME));
- tokens.put("${COMPONENT_NAME}", componentName);
- tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase());
- tokens.put("${COMPONENT_PREFIX}", prefix);
- tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase());
- if (!componentName.equals(componentGroup) && componentName.startsWith(componentGroup)) {
- tokens.put("${COMPONENT_ID}", componentName.substring(componentGroup.length()));
- }
- tokens.put("${CLUSTER_NAME}", getClusterName());
- tokens.put("${CLUSTER_NAME.lc}", getClusterName().toLowerCase());
- tokens.put("${APP_NAME}", getClusterName());
- tokens.put("${APP_NAME.lc}", getClusterName().toLowerCase());
- tokens.put("${APP_COMPONENT_NAME}", componentName);
- tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase());
- return tokens;
+ providerUtils.dereferenceAllConfigs(configurations);
}
@VisibleForTesting
- protected List<String> getSystemConfigurationsRequested(ConfTreeOperations appConf) {
- List<String> configList = new ArrayList<String>();
-
- String configTypes = appConf.get(AgentKeys.SYSTEM_CONFIGS);
- if (configTypes != null && configTypes.length() > 0) {
- String[] configs = configTypes.split(",");
- for (String config : configs) {
- configList.add(config.trim());
- }
- }
-
- return new ArrayList<String>(new HashSet<String>(configList));
+ protected Set<String> getSystemConfigurationsRequested(ConfTreeOperations appConf) {
+ return providerUtils.getSystemConfigurationsRequested(appConf);
}
-
@VisibleForTesting
protected List<String> getApplicationConfigurationTypes(String roleGroup) {
List<String> configList = new ArrayList<String>();
@@ -3275,30 +2816,17 @@ public class AgentProviderService extends AbstractProviderService implements
configurations.put(configName, config);
}
+ @VisibleForTesting
protected void addRoleRelatedTokens(Map<String, String> tokens) {
- for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
- String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
- String hosts = StringUtils.join(",", getHostsList(entry.getValue().values(), true));
- tokens.put("${" + tokenName + "}", hosts);
- }
- }
-
- private Iterable<String> getHostsList(Collection<ClusterNode> values,
- boolean hostOnly) {
- List<String> hosts = new ArrayList<String>();
- for (ClusterNode cn : values) {
- hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
- }
-
- return hosts;
+ providerUtils.addRoleRelatedTokens(tokens, getAmState());
}
private void addDefaultGlobalConfig(Map<String, String> config, String containerId, String roleName) {
config.put("app_log_dir", "${AGENT_LOG_ROOT}");
config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run");
config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install");
- config.put("app_conf_dir", "${AGENT_WORK_ROOT}/" + AgentKeys.APP_CONF_DIR);
- config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + SliderKeys.PROPAGATED_CONF_DIR_NAME);
+ config.put("app_conf_dir", "${AGENT_WORK_ROOT}/" + APP_CONF_DIR);
+ config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + PROPAGATED_CONF_DIR_NAME);
config.put("app_container_id", containerId);
config.put("app_container_tag", tags.getTag(roleName, containerId));
@@ -3315,7 +2843,8 @@ public class AgentProviderService extends AbstractProviderService implements
for (Map.Entry<String, Map<String, ClusterNode>> entry :
getRoleClusterNodeMapping().entrySet()) {
details.put(entry.getKey() + " Host(s)/Container(s)",
- new MonitorDetail(getHostsList(entry.getValue().values(), false).toString(), false));
+ new MonitorDetail(providerUtils.getHostsList(
+ entry.getValue().values(), false).toString(), false));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java
new file mode 100644
index 0000000..13473e5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerClientProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.docker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.providers.ProviderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.slider.providers.docker.DockerKeys.DOCKER_IMAGE;
+
+public class DockerClientProvider extends AbstractClientProvider
+ implements SliderKeys {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(DockerClientProvider.class);
+ private static final ProviderUtils providerUtils = new ProviderUtils(log);
+ protected static final String NAME = "docker";
+
+ public DockerClientProvider(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public List<ProviderRole> getRoles() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void validateInstanceDefinition(AggregateConf instanceDefinition,
+ SliderFileSystem fs) throws SliderException {
+ super.validateInstanceDefinition(instanceDefinition, fs);
+
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+
+ for (String roleGroup : resources.getComponentNames()) {
+ if (roleGroup.equals(COMPONENT_AM)) {
+ continue;
+ }
+ if (appConf.getComponentOpt(roleGroup, DOCKER_IMAGE, null) == null &&
+ appConf.getGlobalOptions().get(DOCKER_IMAGE) == null) {
+ throw new BadConfigException("Property " + DOCKER_IMAGE + " not " +
+ "specified for " + roleGroup);
+ }
+
+ providerUtils.getPackages(roleGroup, appConf);
+
+ if (appConf.getComponentOptBool(roleGroup, AM_CONFIG_GENERATION, false)) {
+ // build and localize configuration files
+ Map<String, Map<String, String>> configurations =
+ providerUtils.buildConfigurations(appConf, appConf, null, roleGroup,
+ roleGroup, null);
+ try {
+ providerUtils.localizeConfigFiles(null, roleGroup, roleGroup, appConf,
+ configurations, null, fs, null);
+ } catch (IOException e) {
+ throw new BadConfigException(e.toString());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java
new file mode 100644
index 0000000..40b73a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerKeys.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.docker;
+
+public interface DockerKeys {
+ String PROVIDER_DOCKER = "docker";
+ String DOCKER_PREFIX = "docker.";
+ String DOCKER_IMAGE = DOCKER_PREFIX + "image";
+ String DOCKER_NETWORK = DOCKER_PREFIX + "network";
+ String DOCKER_USE_PRIVILEGED = DOCKER_PREFIX + "usePrivileged";
+ String DOCKER_START_COMMAND = DOCKER_PREFIX + "startCommand";
+
+ String DEFAULT_DOCKER_NETWORK = "bridge";
+
+ String OUT_FILE = "stdout.txt";
+ String ERR_FILE = "stderr.txt";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java
new file mode 100644
index 0000000..5d2592f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/docker/DockerProviderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.docker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderService;
+import org.apache.slider.providers.SliderProviderFactory;
+
+public class DockerProviderFactory extends SliderProviderFactory {
+
+ public DockerProviderFactory() {
+ }
+
+ public DockerProviderFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public AbstractClientProvider createClientProvider() {
+ return new DockerClientProvider(getConf());
+ }
+
+ @Override
+ public ProviderService createServerProvider() {
+ return new DockerProviderService();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: YARN-5505. Create an agent-less docker
provider in the native-services framework. Contributed by Billie Rinaldi
Posted by ji...@apache.org.
YARN-5505. Create an agent-less docker provider in the native-services framework. Contributed by Billie Rinaldi
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c999964a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c999964a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c999964a
Branch: refs/heads/yarn-native-services
Commit: c999964a068ff6aa38d513793cf12f91089bcb38
Parents: d0eb0e7
Author: Jian He <ji...@apache.org>
Authored: Thu Sep 1 22:38:42 2016 +0800
Committer: Jian He <ji...@apache.org>
Committed: Thu Sep 1 22:39:00 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/slider/api/OptionKeys.java | 15 +-
.../org/apache/slider/client/SliderClient.java | 17 +-
.../org/apache/slider/common/SliderKeys.java | 22 +-
.../apache/slider/common/tools/SliderUtils.java | 4 +
.../slider/core/launch/AbstractLauncher.java | 18 +-
.../PublishedConfigurationOutputter.java | 6 +-
.../providers/AbstractClientProvider.java | 4 +-
.../providers/AbstractProviderService.java | 22 +-
.../slider/providers/ProviderService.java | 12 +-
.../apache/slider/providers/ProviderUtils.java | 1391 ++++++++++++++----
.../providers/agent/AgentClientProvider.java | 36 +-
.../slider/providers/agent/AgentKeys.java | 12 +-
.../providers/agent/AgentProviderService.java | 705 ++-------
.../providers/docker/DockerClientProvider.java | 96 ++
.../slider/providers/docker/DockerKeys.java | 32 +
.../providers/docker/DockerProviderFactory.java | 43 +
.../providers/docker/DockerProviderService.java | 355 +++++
.../slideram/SliderAMProviderService.java | 4 -
.../server/appmaster/SliderAppMaster.java | 39 +-
.../main/resources/org/apache/slider/slider.xml | 4 +
.../slider/providers/docker/appConfig.json | 42 +
.../slider/providers/docker/resources.json | 16 +
.../slider/providers/docker/test.template | 16 +
23 files changed, 1971 insertions(+), 940 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
index a035a99..434b1d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/OptionKeys.java
@@ -41,7 +41,20 @@ public interface OptionKeys extends InternalKeys {
* Prefix for site.xml options: {@value}
*/
String SITE_XML_PREFIX = "site.";
-
+ /**
+ * Prefix for config file options: {@value}
+ */
+ String CONF_FILE_PREFIX = "conf.";
+ /**
+ * Prefix for package options: {@value}
+ */
+ String PKG_FILE_PREFIX = "pkg.";
+ /**
+ * Prefix for export options: {@value}
+ */
+ String EXPORT_PREFIX = "export.";
+ String TYPE_SUFFIX = ".type";
+ String NAME_SUFFIX = ".name";
/**
* Zookeeper quorum host list: {@value}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 3129f6f..5096bb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -151,7 +151,6 @@ import org.apache.slider.core.registry.YarnAppListClient;
import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
import org.apache.slider.core.registry.docstore.PublishedExportsSet;
@@ -162,6 +161,7 @@ import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.docker.DockerClientProvider;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
@@ -2081,7 +2081,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
// add the tags if available
Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
- getApplicationDefinitionPath(appOperations));
+ appOperations);
Credentials credentials = null;
if (clusterSecure) {
@@ -2242,11 +2242,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
);
- // TODO: consider supporting apps that don't have an image path
- Path imagePath =
- extractImagePath(sliderFileSystem, internalOptions);
- if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
- log.debug("Registered image path {}", imagePath);
+ if (!(provider instanceof DockerClientProvider)) {
+ Path imagePath =
+ extractImagePath(sliderFileSystem, internalOptions);
+ if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
+ log.debug("Registered image path {}", imagePath);
+ }
}
// build the environment
@@ -3814,7 +3815,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
Path subPath = new Path(path1, appReport.getApplicationId()
.toString() + "/agent");
imagePath = subPath.toString();
- String pathStr = imagePath + "/" + AGENT_TAR;
+ String pathStr = imagePath + "/" + AgentKeys.AGENT_TAR;
try {
validateHDFSFile(sliderFileSystem, pathStr);
log.info("Slider agent package is properly installed at " + pathStr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index 120b1fc..1484ee3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -81,6 +81,10 @@ public interface SliderKeys extends SliderXmlConfKeys {
String COMPONENT_SEPARATOR = "-";
String[] COMPONENT_KEYS_TO_SKIP = {"zookeeper.", "env.MALLOC_ARENA_MAX",
"site.fs.", "site.dfs."};
+ /**
+ * A component type for a client component
+ */
+ String COMPONENT_TYPE_CLIENT = "client";
/**
* Key for application version. This must be set in app_config/global {@value}
@@ -222,7 +226,6 @@ public interface SliderKeys extends SliderXmlConfKeys {
String SLIDER_JAR = "slider.jar";
String JCOMMANDER_JAR = "jcommander.jar";
String GSON_JAR = "gson.jar";
- String AGENT_TAR = "slider-agent.tar.gz";
String DEFAULT_APP_PKG = "appPkg.zip";
String DEFAULT_JVM_HEAP = "256M";
@@ -288,4 +291,21 @@ public interface SliderKeys extends SliderXmlConfKeys {
String SLIDER_CLASSPATH_EXTRA = "SLIDER_CLASSPATH_EXTRA";
String YARN_CONTAINER_PATH = "/node/container/";
+
+ String GLOBAL_CONFIG_TAG = "global";
+ String SYSTEM_CONFIGS = "system_configs";
+ String JAVA_HOME = "java_home";
+ String TWO_WAY_SSL_ENABLED = "ssl.server.client.auth";
+ String INFRA_RUN_SECURITY_DIR = "infra/run/security/";
+ String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt";
+
+ String AM_CONFIG_GENERATION = "am.config.generation";
+ String APP_CONF_DIR = "app/conf";
+
+ String APP_RESOURCES = "application.resources";
+ String APP_RESOURCES_DIR = "app/resources";
+ String PER_COMPONENT = "per.component";
+ String PER_GROUP = "per.group";
+
+ String APP_PACKAGES_DIR = "app/packages";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
index e9f65ba..f773982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
@@ -183,6 +183,10 @@ public final class SliderUtils {
return !isUnset(s);
}
+ public static boolean isEmpty(List l) {
+ return l == null || l.isEmpty();
+ }
+
/**
* Probe for a list existing and not being empty
* @param l list
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 5a3eb3d..aefc0de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -52,6 +52,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.apache.slider.providers.docker.DockerKeys.DEFAULT_DOCKER_NETWORK;
+
/**
* Launcher of applications: base class
*/
@@ -79,6 +81,7 @@ public abstract class AbstractLauncher extends Configured {
protected LogAggregationContext logAggregationContext;
protected boolean yarnDockerMode = false;
protected String dockerImage;
+ protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
protected String yarnContainerMountPoints;
protected String runPrivilegedContainer;
@@ -232,7 +235,8 @@ public abstract class AbstractLauncher extends Configured {
if(yarnDockerMode){
Map<String, String> env = containerLaunchContext.getEnvironment();
env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker");
- env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);//if yarnDockerMode, then dockerImage is set
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork);
env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer);
StringBuilder sb = new StringBuilder();
for (Entry<String,String> mount : mountPaths.entrySet()) {
@@ -517,6 +521,10 @@ public abstract class AbstractLauncher extends Configured {
this.dockerImage = dockerImage;
}
+ public void setDockerNetwork(String dockerNetwork) {
+ this.dockerNetwork = dockerNetwork;
+ }
+
public void setYarnContainerMountPoints(String yarnContainerMountPoints) {
this.yarnContainerMountPoints = yarnContainerMountPoints;
}
@@ -525,4 +533,12 @@ public abstract class AbstractLauncher extends Configured {
this.runPrivilegedContainer = runPrivilegedContainer;
}
+ public void setRunPrivilegedContainer(boolean runPrivilegedContainer) {
+ if (runPrivilegedContainer) {
+ this.runPrivilegedContainer = Boolean.toString(true);
+ } else {
+ this.runPrivilegedContainer = Boolean.toString(false);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
index 9bdcfcb..4ec513c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/docstore/PublishedConfigurationOutputter.java
@@ -39,6 +39,8 @@ import java.util.Properties;
*/
public abstract class PublishedConfigurationOutputter {
+ private static final String COMMENTS = "Generated by Apache Slider";
+
protected final PublishedConfiguration owner;
protected PublishedConfigurationOutputter(PublishedConfiguration owner) {
@@ -143,13 +145,13 @@ public abstract class PublishedConfigurationOutputter {
@Override
public void save(OutputStream out) throws IOException {
- properties.store(out, "");
+ properties.store(out, COMMENTS);
}
public String asString() throws IOException {
StringWriter sw = new StringWriter();
- properties.store(sw, "");
+ properties.store(sw, COMMENTS);
return sw.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
index 510de5d..f59c347 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
@@ -216,8 +216,8 @@ public abstract class AbstractClientProvider extends Configured {
* Return a set of application specific string tags.
* @return the set of tags.
*/
- public Set<String> getApplicationTags (SliderFileSystem fileSystem,
- String appDef) throws SliderException {
+ public Set<String> getApplicationTags(SliderFileSystem fileSystem,
+ ConfTreeOperations appConf) throws SliderException {
return Collections.emptySet();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index 92766f5..19fa07b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -139,6 +140,19 @@ public abstract class AbstractProviderService
}
/**
+ * Load default Configuration
+ * @param confDir configuration directory
+ * @return configuration
+ * @throws BadCommandArgumentsException
+ * @throws IOException
+ */
+ @Override
+ public Configuration loadProviderConfigurationInformation(File confDir)
+ throws BadCommandArgumentsException, IOException {
+ return new Configuration(false);
+ }
+
+ /**
* Load a specific XML configuration file for the provider config
* @param confDir configuration directory
* @param siteXMLFilename provider-specific filename
@@ -369,8 +383,6 @@ public abstract class AbstractProviderService
@Override
public void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException {
this.amWebAPI = amWebURI;
@@ -422,4 +434,10 @@ public abstract class AbstractProviderService
public void rebuildContainerDetails(List<Container> liveContainers,
String applicationId, Map<Integer, ProviderRole> providerRoles) {
}
+
+ @Override
+ public boolean processContainerStatus(ContainerId containerId,
+ ContainerStatus status) {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index 3f24665..b62510a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.conf.AggregateConf;
@@ -189,13 +190,9 @@ public interface ProviderService extends ProviderCore,
/**
* Prior to going live -register the initial service registry data
* @param amWebURI URL to the AM. This may be proxied, so use relative paths
- * @param agentOpsURI URI for agent operations. This will not be proxied
- * @param agentStatusURI URI For agent status. Again: no proxy
* @param serviceRecord service record to build up
*/
void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException;
@@ -216,4 +213,11 @@ public interface ProviderService extends ProviderCore,
*/
void rebuildContainerDetails(List<Container> liveContainers,
String applicationId, Map<Integer, ProviderRole> providerRoles);
+
+ /**
+ * Process container status
+ * @return true if status needs to be requested again, false otherwise
+ */
+ boolean processContainerStatus(ContainerId containerId,
+ ContainerStatus status);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c999964a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
index 07d106b..47556f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
@@ -18,16 +18,29 @@
package org.apache.slider.providers;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.slider.api.ClusterDescription;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
+import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
@@ -35,28 +48,50 @@ import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.ConfigUtils;
+import org.apache.slider.core.registry.docstore.ExportEntry;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.services.security.CertificateManager;
+import org.apache.slider.server.services.security.SecurityStore;
+import org.apache.slider.server.services.security.StoresGenerator;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.LinkedList;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.regex.Pattern;
/**
- * this is a factoring out of methods handy for providers. It's bonded to a log at
- * construction time
+ * This is a factoring out of methods handy for providers. It's bonded to a log
+ * at construction time.
*/
-public class ProviderUtils implements RoleKeys {
+public class ProviderUtils implements RoleKeys, SliderKeys {
protected final Logger log;
/**
- * Create an instace
+ * Create an instance
* @param log log directory to use -usually the provider
*/
@@ -66,14 +101,14 @@ public class ProviderUtils implements RoleKeys {
/**
* Add oneself to the classpath. This does not work
- * on minicluster test runs where the JAR is not built up
+ * on minicluster test runs where the JAR is not built up.
* @param providerResources map of provider resources to add these entries to
* @param provider provider to add
* @param jarName name of the jar to use
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libdir relative directory to place resources
- * @param miniClusterTestRun
+ * @param miniClusterTestRun true if minicluster is being used
* @return true if the class was found in a JAR
*
* @throws FileNotFoundException if the JAR was not found and this is NOT
@@ -81,7 +116,8 @@ public class ProviderUtils implements RoleKeys {
* @throws IOException IO problems
* @throws SliderException any Slider problem
*/
- public static boolean addProviderJar(Map<String, LocalResource> providerResources,
+ public static boolean addProviderJar(
+ Map<String, LocalResource> providerResources,
Object provider,
String jarName,
SliderFileSystem sliderFileSystem,
@@ -108,13 +144,14 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Add/overwrite the agent tarball (overwritten every time application is restarted)
- * @param provider
- * @param tarName
- * @param sliderFileSystem
- * @param agentDir
+ * Add/overwrite the agent tarball (overwritten every time application is
+ * restarted).
+ * @param provider an instance of a provider class
+ * @param tarName name of the tarball to upload
+ * @param sliderFileSystem the file system
+ * @param agentDir directory to upload to
* @return true the location could be determined and the file added
- * @throws IOException
+ * @throws IOException if the upload fails
*/
public static boolean addAgentTar(Object provider,
String tarName,
@@ -125,100 +162,58 @@ public class ProviderUtils implements RoleKeys {
if(localFile != null) {
String parentDir = localFile.getParent();
Path agentTarPath = new Path(parentDir, tarName);
- sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, agentTarPath, agentDir);
+ sliderFileSystem.getFileSystem().copyFromLocalFile(false, true,
+ agentTarPath, agentDir);
return true;
}
return false;
}
/**
- * Add a set of dependencies to the provider resources being built up,
- * by copying them from the local classpath to the remote one, then
- * registering them
- * @param providerResources map of provider resources to add these entries to
- * @param sliderFileSystem target filesystem
- * @param tempPath path in the cluster FS for temp files
- * @param libdir relative directory to place resources
- * @param resources list of resource names (e.g. "hbase.jar"
- * @param classes list of classes where classes[i] refers to a class in
- * resources[i]
- * @throws IOException IO problems
- * @throws SliderException any Slider problem
- */
- public static void addDependencyJars(Map<String, LocalResource> providerResources,
- SliderFileSystem sliderFileSystem,
- Path tempPath,
- String libdir,
- String[] resources,
- Class[] classes
- ) throws
- IOException,
- SliderException {
- if (resources.length != classes.length) {
- throw new SliderInternalStateException(
- "mismatch in Jar names [%d] and classes [%d]",
- resources.length,
- classes.length);
- }
- int size = resources.length;
- for (int i = 0; i < size; i++) {
- String jarName = resources[i];
- Class clazz = classes[i];
- SliderUtils.putJar(providerResources,
- sliderFileSystem,
- clazz,
- tempPath,
- libdir,
- jarName);
- }
-
- }
-
- /**
- * Loads all dependency jars from the default path
+ * Loads all dependency jars from the default path.
* @param providerResources map of provider resources to add these entries to
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libDir relative directory to place resources
* @param libLocalSrcDir explicitly supplied local libs dir
- * @throws IOException
- * @throws SliderException
- */
- public static void addAllDependencyJars(Map<String, LocalResource> providerResources,
- SliderFileSystem sliderFileSystem,
- Path tempPath,
- String libDir,
- String libLocalSrcDir)
+ * @throws IOException trouble copying to HDFS
+ * @throws SliderException trouble copying to HDFS
+ */
+ public static void addAllDependencyJars(
+ Map<String, LocalResource> providerResources,
+ SliderFileSystem sliderFileSystem,
+ Path tempPath,
+ String libDir,
+ String libLocalSrcDir)
throws IOException, SliderException {
- String libSrcToUse = libLocalSrcDir;
if (SliderUtils.isSet(libLocalSrcDir)) {
File file = new File(libLocalSrcDir);
if (!file.exists() || !file.isDirectory()) {
- throw new BadCommandArgumentsException("Supplied lib src dir %s is not valid", libLocalSrcDir);
+ throw new BadCommandArgumentsException(
+ "Supplied lib src dir %s is not valid", libLocalSrcDir);
}
}
- SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, libDir, libSrcToUse);
+ SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath,
+ libDir, libLocalSrcDir);
}
+
/**
- * build the log directory
- * @return the log dir
+ * Validate the requested number of instances of a component.
+ * <p>
+ * If max <= 0: min <= count
+ * If max > 0: min <= count <= max
+ * @param instanceDescription configuration
+ * @param name node class name
+ * @param min requested heap size
+ * @param max maximum value.
+ * @throws BadCommandArgumentsException if the values are out of range
*/
- public String getLogdir() throws IOException {
- String logdir = System.getenv("LOGDIR");
- if (logdir == null) {
- logdir =
- SliderKeys.TMP_LOGDIR_PREFIX + UserGroupInformation.getCurrentUser().getShortUserName();
- }
- return logdir;
- }
-
-
public void validateNodeCount(AggregateConf instanceDescription,
- String name, int min, int max) throws
- BadCommandArgumentsException {
+ String name, int min, int max)
+ throws BadCommandArgumentsException {
MapOperations component =
- instanceDescription.getResourceOperations().getComponent(name);
+ instanceDescription.getResourceOperations().getComponent(name);
int count;
if (component == null) {
count = 0;
@@ -229,7 +224,7 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Validate the node count and heap size values of a node class
+ * Validate the count is between min and max.
* <p>
* If max <= 0: min <= count
* If max > 0: min <= count <= max
@@ -256,33 +251,36 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * copy all options beginning site. into the site.xml
- * @param clusterSpec cluster specification
- * @param sitexml map for XML file to build up
+ * Copy options beginning with "site.configName." prefix from options map
+ * to sitexml map, removing the prefix and substituting the tokens
+ * specified in the tokenMap.
+ * @param options source map
+ * @param sitexml destination map
+ * @param configName optional ".configName" portion of the prefix
+ * @param tokenMap key/value pairs to substitute into the option values
*/
- public void propagateSiteOptions(ClusterDescription clusterSpec,
- Map<String, String> sitexml) {
- Map<String, String> options = clusterSpec.options;
- propagateSiteOptions(options, sitexml);
- }
-
- public void propagateSiteOptions(Map<String, String> options,
- Map<String, String> sitexml) {
- propagateSiteOptions(options, sitexml, "");
- }
-
public void propagateSiteOptions(Map<String, String> options,
- Map<String, String> sitexml,
- String configName) {
- propagateSiteOptions(options, sitexml, configName, null);
+ Map<String, String> sitexml,
+ String configName,
+ Map<String,String> tokenMap) {
+ String prefix = OptionKeys.SITE_XML_PREFIX +
+ (!configName.isEmpty() ? configName + "." : "");
+ propagateOptions(options, sitexml, tokenMap, prefix);
}
- public void propagateSiteOptions(Map<String, String> options,
+ /**
+ * Copy options beginning with prefix from options map
+ * to sitexml map, removing the prefix and substituting the tokens
+ * specified in the tokenMap.
+ * @param options source map
+ * @param sitexml destination map
+ * @param tokenMap key/value pairs to substitute into the option values
+ * @param prefix which options to copy to destination map
+ */
+ public void propagateOptions(Map<String, String> options,
Map<String, String> sitexml,
- String configName,
- Map<String,String> tokenMap) {
- String prefix = OptionKeys.SITE_XML_PREFIX +
- (!configName.isEmpty() ? configName + "." : "");
+ Map<String,String> tokenMap,
+ String prefix) {
for (Map.Entry<String, String> entry : options.entrySet()) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
@@ -302,229 +300,1038 @@ public class ProviderUtils implements RoleKeys {
}
/**
- * Propagate an option from the cluster specification option map
- * to the site XML map, using the site key for the name
- * @param global global config spec
- * @param optionKey key in the option map
- * @param sitexml map for XML file to build up
- * @param siteKey key to assign the value to in the site XML
- * @throws BadConfigException if the option is missing from the cluster spec
- */
- public void propagateOption(MapOperations global,
- String optionKey,
- Map<String, String> sitexml,
- String siteKey) throws BadConfigException {
- sitexml.put(siteKey, global.getMandatoryOption(optionKey));
- }
-
-
- /**
- * Build the image dir. This path is relative and only valid at the far end
- * @param instanceDefinition instance definition
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToHomeDir(AggregateConf instanceDefinition,
- String bindir,
- String script) throws
- FileNotFoundException,
- BadConfigException {
- MapOperations globalOptions =
- instanceDefinition.getInternalOperations().getGlobalOptions();
- String applicationHome =
- globalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME);
- String imagePath =
- globalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
- return buildPathToHomeDir(imagePath, applicationHome, bindir, script);
- }
-
- public String buildPathToHomeDir(String imagePath,
- String applicationHome,
- String bindir, String script) throws
- FileNotFoundException {
- String path;
- File scriptFile;
- if (imagePath != null) {
- File tarball = new File(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
- scriptFile = findBinScriptInExpandedArchive(tarball, bindir, script);
- // now work back from the script to build the relative path
- // to the binary which will be valid remote or local
- StringBuilder builder = new StringBuilder();
- builder.append(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
- builder.append("/");
- //for the script, we want the name of ../..
- File archive = scriptFile.getParentFile().getParentFile();
- builder.append(archive.getName());
- path = builder.toString();
+ * Substitute tokens into option map values, returning a new map.
+ * @param options source map
+ * @param tokenMap key/value pairs to substitute into the option values
+ * @return map with substituted values
+ */
+ public Map<String, String> filterSiteOptions(Map<String, String> options,
+ Map<String, String> tokenMap) {
+ String prefix = OptionKeys.SITE_XML_PREFIX;
+ Map<String, String> filteredOptions = new HashMap<>();
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ String value = entry.getValue();
+ if (tokenMap != null) {
+ for (Map.Entry<String,String> token : tokenMap.entrySet()) {
+ value = value.replaceAll(Pattern.quote(token.getKey()),
+ token.getValue());
+ }
+ }
+ filteredOptions.put(key, value);
+ }
+ }
+ return filteredOptions;
+ }
+
+ /**
+ * Get resource requirements from a String value. If value isn't specified,
+ * use the default value. If value is greater than max, use the max value.
+ * @param val string value
+ * @param defVal default value
+ * @param maxVal maximum value
+ * @return int resource requirement
+ */
+ public int getRoleResourceRequirement(String val,
+ int defVal,
+ int maxVal) {
+ if (val==null) {
+ val = Integer.toString(defVal);
+ }
+ Integer intVal;
+ if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) {
+ intVal = maxVal;
+ } else {
+ intVal = Integer.decode(val);
+ }
+ return intVal;
+ }
+
+ /**
+ * Localize the service keytabs for the application.
+ * @param launcher container launcher
+ * @param instanceDefinition app specification
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException trouble uploading to HDFS
+ */
+ public void localizeServiceKeytabs(ContainerLauncher launcher,
+ AggregateConf instanceDefinition, SliderFileSystem fileSystem,
+ String clusterName) throws IOException {
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ String keytabPathOnHost = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+ if (SliderUtils.isUnset(keytabPathOnHost)) {
+ String amKeytabName = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+ String keytabDir = appConf.getComponent(COMPONENT_AM).get(
+ SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
+ // we need to localize the keytab files in the directory
+ Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
+ clusterName);
+ boolean serviceKeytabsDeployed = false;
+ if (fileSystem.getFileSystem().exists(keytabDirPath)) {
+ FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(
+ keytabDirPath);
+ LocalResource keytabRes;
+ for (FileStatus keytab : keytabs) {
+ if (!amKeytabName.equals(keytab.getPath().getName())
+ && keytab.getPath().getName().endsWith(".keytab")) {
+ serviceKeytabsDeployed = true;
+ log.info("Localizing keytab {}", keytab.getPath().getName());
+ keytabRes = fileSystem.createAmResource(keytab.getPath(),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(KEYTAB_DIR + "/" +
+ keytab.getPath().getName(),
+ keytabRes);
+ }
+ }
+ }
+ if (!serviceKeytabsDeployed) {
+ log.warn("No service keytabs for the application have been localized. "
+ + "If the application requires keytabs for secure operation, "
+ + "please ensure that the required keytabs have been uploaded "
+ + "to the folder {}", keytabDirPath);
+ }
+ }
+ }
+
+ /**
+ * Return whether two-way SSL is enabled for Agent / AM communication.
+ * @param amComponent component specification
+ * @return true if enabled
+ */
+ public boolean hasTwoWaySSLEnabled(MapOperations amComponent) {
+ return amComponent != null ?
+ amComponent.getOptionBool(TWO_WAY_SSL_ENABLED, false) : false;
+ }
+
+ /**
+ * Generate and localize SSL certs for Agent / AM communication
+ * @param launcher container launcher
+ * @param container allocated container information
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws SliderException certs cannot be generated/uploaded
+ */
+ public void localizeContainerSSLResources(ContainerLauncher launcher,
+ Container container, SliderFileSystem fileSystem, String clusterName)
+ throws SliderException {
+ try {
+ // localize server cert
+ Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName);
+ LocalResource certResource = fileSystem.createAmResource(
+ new Path(certsDir, CRT_FILE_NAME),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(CERT_FILE_LOCALIZATION_PATH, certResource);
+
+ // generate and localize agent cert
+ CertificateManager certMgr = new CertificateManager();
+ String hostname = container.getNodeId().getHost();
+ String containerId = container.getId().toString();
+ certMgr.generateContainerCertificate(hostname, containerId);
+ LocalResource agentCertResource = fileSystem.createAmResource(
+ uploadSecurityResource(
+ CertificateManager.getAgentCertficateFilePath(containerId),
+ fileSystem, clusterName), LocalResourceType.FILE);
+ // still using hostname as file name on the agent side, but the files
+ // do end up under the specific container's file space
+ launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname +
+ ".crt", agentCertResource);
+ LocalResource agentKeyResource = fileSystem.createAmResource(
+ uploadSecurityResource(
+ CertificateManager.getAgentKeyFilePath(containerId), fileSystem,
+ clusterName),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(INFRA_RUN_SECURITY_DIR + hostname +
+ ".key", agentKeyResource);
+
+ } catch (Exception e) {
+ throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e,
+ "Unable to localize certificates. Two-way SSL cannot be enabled");
+ }
+ }
+
+ /**
+ * Upload a local file to the cluster security dir in HDFS. If the file
+ * already exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private Path uploadSecurityResource(File resource,
+ SliderFileSystem fileSystem, String clusterName) throws IOException {
+ Path certsDir = fileSystem.buildClusterSecurityDirPath(clusterName);
+ return uploadResource(resource, fileSystem, certsDir);
+ }
+
+ /**
+ * Upload a local file to the cluster resources dir in HDFS. If the file
+ * already exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param roleName optional subdirectory (for component-specific resources)
+ * @param clusterName app name
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private Path uploadResource(File resource, SliderFileSystem fileSystem,
+ String roleName, String clusterName) throws IOException {
+ Path dir;
+ if (roleName == null) {
+ dir = fileSystem.buildClusterResourcePath(clusterName);
+ } else {
+ dir = fileSystem.buildClusterResourcePath(clusterName, roleName);
+ }
+ return uploadResource(resource, fileSystem, dir);
+ }
+ /**
+ * Upload a local file to a specified HDFS directory. If the file already
+ * exists, it is not replaced.
+ * @param resource file to upload
+ * @param fileSystem file system
+ * @param parentDir destination directory in HDFS
+ * @return Path of the uploaded file
+ * @throws IOException file cannot be uploaded
+ */
+ private synchronized Path uploadResource(File resource,
+ SliderFileSystem fileSystem, Path parentDir) throws IOException {
+ if (!fileSystem.getFileSystem().exists(parentDir)) {
+ fileSystem.getFileSystem().mkdirs(parentDir,
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ }
+ Path destPath = new Path(parentDir, resource.getName());
+ if (!fileSystem.getFileSystem().exists(destPath)) {
+ FSDataOutputStream os = null;
+ try {
+ os = fileSystem.getFileSystem().create(destPath);
+ byte[] contents = FileUtils.readFileToByteArray(resource);
+ os.write(contents, 0, contents.length);
+ os.flush();
+ } finally {
+ IOUtils.closeStream(os);
+ }
+ log.info("Uploaded {} to localization path {}", resource, destPath);
} else {
- // using a home directory which is required to be present on
- // the local system -so will be absolute and resolvable
- File homedir = new File(applicationHome);
- path = homedir.getAbsolutePath();
+ log.info("Resource {} already existed at localization path {}", resource,
+ destPath);
+ }
- //this is absolute, resolve its entire path
- SliderUtils.verifyIsDir(homedir, log);
- File bin = new File(homedir, bindir);
- SliderUtils.verifyIsDir(bin, log);
- scriptFile = new File(bin, script);
- SliderUtils.verifyFileExists(scriptFile, log);
+ while (!fileSystem.getFileSystem().exists(destPath)) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- return path;
+
+ fileSystem.getFileSystem().setPermission(destPath,
+ new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
+
+ return destPath;
}
-
/**
- * Build the image dir. This path is relative and only valid at the far end
- * @param instance instance options
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToScript(AggregateConf instance,
- String bindir,
- String script) throws FileNotFoundException {
- return buildPathToScript(instance.getInternalOperations(), bindir, script);
- }
- /**
- * Build the image dir. This path is relative and only valid at the far end
- * @param internal internal options
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory*
- */
- public String buildPathToScript(ConfTreeOperations internal,
- String bindir,
- String script) throws FileNotFoundException {
-
- String homedir = buildPathToHomeDir(
- internal.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH),
- internal.get(InternalKeys.INTERNAL_APPLICATION_HOME),
- bindir,
- script);
- return buildScriptPath(bindir, script, homedir);
+ * Write a configuration property map to a local file in a specified format.
+ * @param fileSystem file system
+ * @param file destination file
+ * @param configFormat file format
+ * @param configFileDN file description
+ * @param config properties to save to the file
+ * @param clusterName app name
+ * @throws IOException file cannot be created
+ */
+ private void createConfigFile(SliderFileSystem fileSystem, File file,
+ ConfigFormat configFormat, String configFileDN,
+ Map<String, String> config, String clusterName) throws IOException {
+ log.info("Writing {} file {}", configFormat, file);
+
+ ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
+ fileSystem, clusterName, file.getName());
+ PublishedConfiguration publishedConfiguration =
+ new PublishedConfiguration(configFileDN,
+ config.entrySet());
+ PublishedConfigurationOutputter configurationOutputter =
+ PublishedConfigurationOutputter.createOutputter(configFormat,
+ publishedConfiguration);
+ configurationOutputter.save(file);
}
-
-
- public String buildScriptPath(String bindir, String script, String homedir) {
- StringBuilder builder = new StringBuilder(homedir);
- builder.append("/");
- builder.append(bindir);
- builder.append("/");
- builder.append(script);
- return builder.toString();
+ /**
+ * Determine config files requested in the appConf, generate the files, and
+ * localize them.
+ * @param launcher container launcher
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @param configs configurations grouped by config name
+ * @param env environment variables
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException file(s) cannot be uploaded
+ * @throws BadConfigException file name not specified or file format not
+ * supported
+ */
+ public void localizeConfigFiles(ContainerLauncher launcher,
+ String roleName, String roleGroup,
+ ConfTreeOperations appConf,
+ Map<String, Map<String, String>> configs,
+ MapOperations env,
+ SliderFileSystem fileSystem,
+ String clusterName)
+ throws IOException, BadConfigException {
+ for (Entry<String, Map<String, String>> configEntry : configs.entrySet()) {
+ String configFileName = appConf.getComponentOpt(roleGroup,
+ OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys
+ .NAME_SUFFIX, null);
+ String configFileType = appConf.getComponentOpt(roleGroup,
+ OptionKeys.CONF_FILE_PREFIX + configEntry.getKey() + OptionKeys
+ .TYPE_SUFFIX, null);
+ if (configFileName == null && configFileType == null) {
+ // config file not requested, so continue
+ continue;
+ }
+ if (configFileName == null) {
+ throw new BadConfigException("Config file name null for " +
+ configEntry.getKey());
+ }
+ if (configFileType == null) {
+ throw new BadConfigException("Config file type null for " +
+ configEntry.getKey());
+ }
+ ConfigFormat configFormat = ConfigFormat.resolve(configFileType);
+ if (configFormat == null) {
+ throw new BadConfigException("Config format " + configFormat +
+ " doesn't exist");
+ }
+ localizeConfigFile(launcher, roleName, roleGroup, configEntry.getKey(),
+ configFormat, configFileName, configs, env, fileSystem, clusterName);
+ }
}
+ /**
+ * Create and localize a config file.
+ * @param launcher container launcher
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param configFileDN config description/name
+ * @param configFormat config format
+ * @param configFileName config file name
+ * @param configs configs grouped by config description/name
+ * @param env environment variables
+ * @param fileSystem file system
+ * @param clusterName app name
+ * @throws IOException file cannot be uploaded
+ */
+ public void localizeConfigFile(ContainerLauncher launcher,
+ String roleName, String roleGroup,
+ String configFileDN, ConfigFormat configFormat, String configFileName,
+ Map<String, Map<String, String>> configs,
+ MapOperations env,
+ SliderFileSystem fileSystem,
+ String clusterName)
+ throws IOException {
+ if (launcher == null) {
+ return;
+ }
+ Map<String, String> config = ConfigUtils.replacePropsInConfig(
+ configs.get(configFileDN), env.options);
+ String fileName = ConfigUtils.replaceProps(config, configFileName);
+ File localFile = new File(RESOURCE_DIR);
+ if (!localFile.exists()) {
+ if (!localFile.mkdir()) {
+ throw new IOException(RESOURCE_DIR + " could not be created!");
+ }
+ }
+ localFile = new File(localFile, new File(fileName).getName());
- public static String convertToAppRelativePath(File file) {
- return convertToAppRelativePath(file.getPath());
+ String folder = null;
+ if ("true".equals(config.get(PER_COMPONENT))) {
+ folder = roleName;
+ } else if ("true".equals(config.get(PER_GROUP))) {
+ folder = roleGroup;
+ }
+
+ log.info("Localizing {} configs to config file {} (destination {}) " +
+ "based on {} configs", config.size(), localFile, fileName,
+ configFileDN);
+ createConfigFile(fileSystem, localFile, configFormat, configFileDN, config,
+ clusterName);
+ Path destPath = uploadResource(localFile, fileSystem, folder, clusterName);
+ LocalResource configResource = fileSystem.createAmResource(destPath,
+ LocalResourceType.FILE);
+
+ File destFile = new File(fileName);
+ if (destFile.isAbsolute()) {
+ launcher.addLocalResource(
+ RESOURCE_DIR + "/" + destFile.getName(),
+ configResource, fileName);
+ } else {
+ launcher.addLocalResource(APP_CONF_DIR + "/" + fileName,
+ configResource);
+ }
}
- public static String convertToAppRelativePath(String path) {
- return ApplicationConstants.Environment.PWD.$() + "/" + path;
+ /**
+ * Generate and localize security stores requested by the app. Also perform
+ * last-minute substitution of cluster name into credentials strings.
+ * @param launcher container launcher
+ * @param container allocated container information
+ * @param role component name
+ * @param fileSystem file system
+ * @param instanceDefinition app specification
+ * @param compOps component specification
+ * @param clusterName app name
+ * @throws SliderException stores cannot be generated/uploaded
+ * @throws IOException stores cannot be generated/uploaded
+ */
+ public void localizeContainerSecurityStores(ContainerLauncher launcher,
+ Container container,
+ String role,
+ SliderFileSystem fileSystem,
+ AggregateConf instanceDefinition,
+ MapOperations compOps,
+ String clusterName)
+ throws SliderException, IOException {
+ // substitute CLUSTER_NAME into credentials
+ Map<String,List<String>> newcred = new HashMap<>();
+ for (Entry<String,List<String>> entry :
+ instanceDefinition.getAppConf().credentials.entrySet()) {
+ List<String> resultList = new ArrayList<>();
+ for (String v : entry.getValue()) {
+ resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"),
+ clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
+ clusterName));
+ }
+ newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"),
+ clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
+ clusterName),
+ resultList);
+ }
+ instanceDefinition.getAppConf().credentials = newcred;
+
+ // generate and localize security stores
+ SecurityStore[] stores = generateSecurityStores(container, role,
+ instanceDefinition, compOps);
+ for (SecurityStore store : stores) {
+ LocalResource keystoreResource = fileSystem.createAmResource(
+ uploadSecurityResource(store.getFile(), fileSystem, clusterName),
+ LocalResourceType.FILE);
+ launcher.addLocalResource(String.format("secstores/%s-%s.p12",
+ store.getType(), role),
+ keystoreResource);
+ }
+ }
+
+ /**
+ * Generate security stores requested by the app.
+ * @param container allocated container information
+ * @param role component name
+ * @param instanceDefinition app specification
+ * @param compOps component specification
+ * @return security stores
+ * @throws SliderException stores cannot be generated
+ * @throws IOException stores cannot be generated
+ */
+ private SecurityStore[] generateSecurityStores(Container container,
+ String role,
+ AggregateConf instanceDefinition,
+ MapOperations compOps)
+ throws SliderException, IOException {
+ return StoresGenerator.generateSecurityStores(
+ container.getNodeId().getHost(), container.getId().toString(),
+ role, instanceDefinition, compOps);
}
+ /**
+ * Return whether security stores are requested by the app.
+ * @param compOps component specification
+ * @return true if stores are requested
+ */
+ public boolean areStoresRequested(MapOperations compOps) {
+ return compOps != null ? compOps.
+ getOptionBool(COMP_STORES_REQUIRED_KEY, false) : false;
+ }
- public static void validatePathReferencesLocalDir(String meaning, String path)
- throws BadConfigException {
- File file = new File(path);
- if (!file.exists()) {
- throw new BadConfigException("%s directory %s not found", meaning, file);
+ /**
+ * Localize application tarballs and other resources requested by the app.
+ * @param launcher container launcher
+ * @param fileSystem file system
+ * @param appConf app configurations
+ * @param roleGroup component group
+ * @param clusterName app name
+ * @throws IOException resources cannot be uploaded
+ * @throws BadConfigException package name or type is not specified
+ */
+ public void localizePackages(ContainerLauncher launcher,
+ SliderFileSystem fileSystem, ConfTreeOperations appConf, String roleGroup,
+ String clusterName) throws IOException, BadConfigException {
+ for (Entry<String, Map<String, String>> pkg :
+ getPackages(roleGroup, appConf).entrySet()) {
+ String pkgName = pkg.getValue().get(OptionKeys.NAME_SUFFIX);
+ String pkgType = pkg.getValue().get(OptionKeys.TYPE_SUFFIX);
+ Path pkgPath = fileSystem.buildResourcePath(pkgName);
+ if (!fileSystem.isFile(pkgPath)) {
+ pkgPath = fileSystem.buildResourcePath(clusterName,
+ pkgName);
+ }
+ if (!fileSystem.isFile(pkgPath)) {
+ throw new IOException("Package doesn't exist as a resource: " +
+ pkgName);
+ }
+ log.info("Adding resource {}", pkgName);
+ LocalResourceType type = LocalResourceType.FILE;
+ if ("archive".equals(pkgType)) {
+ type = LocalResourceType.ARCHIVE;
+ }
+ LocalResource packageResource = fileSystem.createAmResource(
+ pkgPath, type);
+ launcher.addLocalResource(APP_PACKAGES_DIR, packageResource);
}
- if (!file.isDirectory()) {
- throw new BadConfigException("%s is not a directory: %s", meaning, file);
+ }
+
+ /**
+ * Build a map of configuration description/name to configuration key/value
+ * properties, with all known tokens substituted into the property values.
+ * @param appConf app configurations
+ * @param internalsConf internal configurations
+ * @param containerId container ID
+ * @param roleName component name
+ * @param roleGroup component group
+ * @param amState access to AM state
+ * @return configuration properties grouped by config description/name
+ */
+ public Map<String, Map<String, String>> buildConfigurations(
+ ConfTreeOperations appConf, ConfTreeOperations internalsConf,
+ String containerId, String roleName, String roleGroup,
+ StateAccessForProviders amState) {
+
+ Map<String, Map<String, String>> configurations = new TreeMap<>();
+ Map<String, String> tokens = getStandardTokenMap(appConf,
+ internalsConf, roleName, roleGroup, containerId);
+
+ Set<String> configs = new HashSet<>();
+ configs.addAll(getApplicationConfigurationTypes(roleGroup, appConf));
+ configs.addAll(getSystemConfigurationsRequested(appConf));
+
+ for (String configType : configs) {
+ addNamedConfiguration(configType, appConf.getGlobalOptions().options,
+ configurations, tokens, amState);
+ if (appConf.getComponent(roleGroup) != null) {
+ addNamedConfiguration(configType,
+ appConf.getComponent(roleGroup).options, configurations, tokens,
+ amState);
+ }
}
+
+ //do a final replacement of re-used configs
+ dereferenceAllConfigs(configurations);
+
+ return configurations;
}
/**
- * get the user name
- * @return the user name
+ * Substitute "site." prefixed configuration values into other configuration
+ * values where needed. The format for these substitutions is that
+ * {@literal ${@//site/configDN/key}} will be replaced by the value for the
+ * "site.configDN.key" property.
+ * @param configurations configuration properties grouped by config
+ * description/name
*/
- public String getUserName() throws IOException {
- return UserGroupInformation.getCurrentUser().getShortUserName();
+ public void dereferenceAllConfigs(
+ Map<String, Map<String, String>> configurations) {
+ Map<String, String> allConfigs = new HashMap<>();
+ String lookupFormat = "${@//site/%s/%s}";
+ for (String configType : configurations.keySet()) {
+ Map<String, String> configBucket = configurations.get(configType);
+ for (String configName : configBucket.keySet()) {
+ allConfigs.put(String.format(lookupFormat, configType, configName),
+ configBucket.get(configName));
+ }
+ }
+
+ boolean finished = false;
+ while (!finished) {
+ finished = true;
+ for (Map.Entry<String, String> entry : allConfigs.entrySet()) {
+ String configValue = entry.getValue();
+ for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
+ String lookUpValue = lookUpEntry.getValue();
+ if (lookUpValue.contains("${@//site/")) {
+ continue;
+ }
+ String lookUpKey = lookUpEntry.getKey();
+ if (configValue != null && configValue.contains(lookUpKey)) {
+ configValue = configValue.replace(lookUpKey, lookUpValue);
+ }
+ }
+ if (!configValue.equals(entry.getValue())) {
+ finished = false;
+ allConfigs.put(entry.getKey(), configValue);
+ }
+ }
+ }
+
+ for (String configType : configurations.keySet()) {
+ Map<String, String> configBucket = configurations.get(configType);
+ for (Map.Entry<String, String> entry: configBucket.entrySet()) {
+ String configName = entry.getKey();
+ String configValue = entry.getValue();
+ for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
+ String lookUpValue = lookUpEntry.getValue();
+ if (lookUpValue.contains("${@//site/")) {
+ continue;
+ }
+ String lookUpKey = lookUpEntry.getKey();
+ if (configValue != null && configValue.contains(lookUpKey)) {
+ configValue = configValue.replace(lookUpKey, lookUpValue);
+ }
+ }
+ configBucket.put(configName, configValue);
+ }
+ }
}
/**
- * Find a script in an expanded archive
- * @param base base directory
- * @param bindir bin subdir
- * @param script script in bin subdir
- * @return the path to the script
- * @throws FileNotFoundException if a file is not found, or it is not a directory
+ * Return a set of configuration description/names represented in the app.
+ * configuration
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @return set of configuration description/names
*/
- public File findBinScriptInExpandedArchive(File base,
- String bindir,
- String script)
- throws FileNotFoundException {
-
- SliderUtils.verifyIsDir(base, log);
- File[] ls = base.listFiles();
- if (ls == null) {
- //here for the IDE to be happy, as the previous check will pick this case
- throw new FileNotFoundException("Failed to list directory " + base);
+ public Set<String> getApplicationConfigurationTypes(String roleGroup,
+ ConfTreeOperations appConf) {
+ Set<String> configList = new HashSet<>();
+
+ String prefix = OptionKeys.CONF_FILE_PREFIX;
+ String suffix = OptionKeys.TYPE_SUFFIX;
+ MapOperations component = appConf.getComponent(roleGroup);
+ if (component != null) {
+ addConfsToList(component, configList, prefix, suffix);
}
+ addConfsToList(appConf.getGlobalOptions(), configList, prefix, suffix);
- log.debug("Found {} entries in {}", ls.length, base);
- List<File> directories = new LinkedList<File>();
- StringBuilder dirs = new StringBuilder();
- for (File file : ls) {
- log.debug("{}", false);
- if (file.isDirectory()) {
- directories.add(file);
- dirs.append(file.getPath()).append(" ");
+ return configList;
+ }
+
+ /**
+ * Finds all configuration description/names of the form
+ * prefixconfigDNsuffix in the configuration (e.g. conf.configDN.type).
+ * @param confMap configuration properties
+ * @param confList set containing configuration description/names
+ * @param prefix configuration key prefix to match
+ * @param suffix configuration key suffix to match
+ */
+ private void addConfsToList(Map<String, String> confMap,
+ Set<String> confList, String prefix, String suffix) {
+ for (String key : confMap.keySet()) {
+ if (key.startsWith(prefix) && key.endsWith(suffix)) {
+ String confName = key.substring(prefix.length(),
+ key.length() - suffix.length());
+ if (!confName.isEmpty()) {
+ confList.add(confName);
+ }
}
}
- if (directories.size() > 1) {
- throw new FileNotFoundException(
- "Too many directories in archive to identify binary: " + dirs);
+ }
+
+ /**
+ * Build a map of package description/name to package key/value properties
+ * (there should be two properties, type and name).
+ * @param roleGroup component group
+ * @param appConf app configurations
+ * @return map of package description/name to package key/value properties
+ * @throws BadConfigException package name or type is not specified
+ */
+ public Map<String, Map<String, String>> getPackages(String roleGroup,
+ ConfTreeOperations appConf) throws BadConfigException {
+ Map<String, Map<String, String>> packages = new HashMap<>();
+ String prefix = OptionKeys.PKG_FILE_PREFIX;
+ String typeSuffix = OptionKeys.TYPE_SUFFIX;
+ String nameSuffix = OptionKeys.NAME_SUFFIX;
+ MapOperations component = appConf.getComponent(roleGroup);
+ if (component == null) {
+ component = appConf.getGlobalOptions();
+ }
+ for (Map.Entry<String, String> entry : component.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ String confName;
+ String type;
+ if (key.endsWith(typeSuffix)) {
+ confName = key.substring(prefix.length(), key.length() - typeSuffix.length());
+ type = typeSuffix;
+ } else if (key.endsWith(nameSuffix)) {
+ confName = key.substring(prefix.length(), key.length() - nameSuffix.length());
+ type = nameSuffix;
+ } else {
+ continue;
+ }
+ if (!packages.containsKey(confName)) {
+ packages.put(confName, new HashMap<String, String>());
+ }
+ packages.get(confName).put(type, entry.getValue());
+ }
}
- if (directories.isEmpty()) {
- throw new FileNotFoundException(
- "No directory found in archive " + base);
+
+ for (Entry<String, Map<String, String>> pkg : packages.entrySet()) {
+ if (!pkg.getValue().containsKey(OptionKeys.TYPE_SUFFIX)) {
+ throw new BadConfigException("Package " + pkg.getKey() + " doesn't " +
+ "have a package type");
+ }
+ if (!pkg.getValue().containsKey(OptionKeys.NAME_SUFFIX)) {
+ throw new BadConfigException("Package " + pkg.getKey() + " doesn't " +
+ "have a package name");
+ }
}
- File archive = directories.get(0);
- File bin = new File(archive, bindir);
- SliderUtils.verifyIsDir(bin, log);
- File scriptFile = new File(bin, script);
- SliderUtils.verifyFileExists(scriptFile, log);
- return scriptFile;
+
+ return packages;
}
/**
- * Return any additional arguments (argv) to provide when starting this role
- *
- * @param roleOptions
- * The options for this role
- * @return A non-null String which contains command line arguments for this role, or the empty string.
+ * Return system configurations requested by the app.
+ * @param appConf app configurations
+ * @return set of system configurations
*/
- public static String getAdditionalArgs(Map<String,String> roleOptions) {
- if (roleOptions.containsKey(RoleKeys.ROLE_ADDITIONAL_ARGS)) {
- String additionalArgs = roleOptions.get(RoleKeys.ROLE_ADDITIONAL_ARGS);
- if (null != additionalArgs) {
- return additionalArgs;
+ public Set<String> getSystemConfigurationsRequested(
+ ConfTreeOperations appConf) {
+ Set<String> configList = new HashSet<>();
+
+ String configTypes = appConf.get(SYSTEM_CONFIGS);
+ if (configTypes != null && configTypes.length() > 0) {
+ String[] configs = configTypes.split(",");
+ for (String config : configs) {
+ configList.add(config.trim());
}
}
- return "";
+ return configList;
}
-
- public int getRoleResourceRequirement(String val,
- int defVal,
- int maxVal) {
- if (val==null) {
- val = Integer.toString(defVal);
+
+ /**
+ * For a given config description/name, pull out its site configs from the
+ * source config map, remove the site.configDN. prefix from them, and place
+ * them into a new config map using the {@link #propagateSiteOptions} method
+ * (with tokens substituted). This new k/v map is put as the value for the
+ * configDN key in the configurations map.
+ * @param configName config description/name
+ * @param sourceConfig config containing site.* properties
+ * @param configurations configuration map to be populated
+ * @param tokens initial substitution tokens
+ * @param amState access to AM state
+ */
+ private void addNamedConfiguration(String configName,
+ Map<String, String> sourceConfig,
+ Map<String, Map<String, String>> configurations,
+ Map<String, String> tokens, StateAccessForProviders amState) {
+ Map<String, String> config = new HashMap<>();
+ if (configName.equals(GLOBAL_CONFIG_TAG)) {
+ addDefaultGlobalConfig(config);
}
- Integer intVal;
- if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) {
- intVal = maxVal;
+ // add role hosts to tokens
+ addRoleRelatedTokens(tokens, amState);
+ propagateSiteOptions(sourceConfig, config, configName, tokens);
+
+ configurations.put(configName, config);
+ }
+
+ /**
+ * Get initial token map to be substituted into config values.
+ * @param appConf app configurations
+ * @param internals internal configurations
+ * @param componentName component name
+ * @param componentGroup component group
+ * @param clusterName app name
+ * @return tokens to replace
+ */
+ public Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
+ ConfTreeOperations internals, String componentName,
+ String componentGroup, String clusterName) {
+ return getStandardTokenMap(appConf, internals, componentName,
+ componentGroup, null, clusterName);
+ }
+
+ /**
+ * Get initial token map to be substituted into config values.
+ * @param appConf app configurations
+ * @param internals internal configurations
+ * @param componentName component name
+ * @param componentGroup component group
+ * @param containerId container ID
+ * @param clusterName app name
+ * @return tokens to replace
+ */
+ public Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
+ ConfTreeOperations internals, String componentName,
+ String componentGroup, String containerId, String clusterName) {
+
+ Map<String, String> tokens = new HashMap<>();
+ if (containerId != null) {
+ tokens.put("${CONTAINER_ID}", containerId);
+ }
+ String nnuri = appConf.get("site.fs.defaultFS");
+ tokens.put("${NN_URI}", nnuri);
+ tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
+ tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
+ tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
+ String prefix = appConf.getComponentOpt(componentGroup, ROLE_PREFIX,
+ null);
+ String dataDirSuffix = "";
+ if (prefix == null) {
+ prefix = "";
} else {
- intVal = Integer.decode(val);
+ dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix);
}
- return intVal;
+ tokens.put("${DEFAULT_DATA_DIR}", internals.getGlobalOptions()
+ .getOption(InternalKeys.INTERNAL_DATA_DIR_PATH, null) + dataDirSuffix);
+ tokens.put("${JAVA_HOME}", appConf.get(JAVA_HOME));
+ tokens.put("${COMPONENT_NAME}", componentName);
+ tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase());
+ tokens.put("${COMPONENT_PREFIX}", prefix);
+ tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase());
+ if (!componentName.equals(componentGroup) &&
+ componentName.startsWith(componentGroup)) {
+ tokens.put("${COMPONENT_ID}",
+ componentName.substring(componentGroup.length()));
+ }
+ if (clusterName != null) {
+ tokens.put("${CLUSTER_NAME}", clusterName);
+ tokens.put("${CLUSTER_NAME.lc}", clusterName.toLowerCase());
+ tokens.put("${APP_NAME}", clusterName);
+ tokens.put("${APP_NAME.lc}", clusterName.toLowerCase());
+ }
+ tokens.put("${APP_COMPONENT_NAME}", componentName);
+ tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase());
+ return tokens;
+ }
+
+ /**
+ * Add ROLE_HOST tokens for substitution into config values.
+ * @param tokens existing tokens
+ * @param amState access to AM state
+ */
+ public void addRoleRelatedTokens(Map<String, String> tokens,
+ StateAccessForProviders amState) {
+ if (amState == null) {
+ return;
+ }
+ for (Map.Entry<String, Map<String, ClusterNode>> entry :
+ amState.getRoleClusterNodeMapping().entrySet()) {
+ String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
+ String hosts = StringUtils .join(",",
+ getHostsList(entry.getValue().values(), true));
+ tokens.put("${" + tokenName + "}", hosts);
+ }
+ }
+
+ /**
+ * Add global configuration properties.
+ * @param config map where default global properties will be added
+ */
+ private void addDefaultGlobalConfig(Map<String, String> config) {
+ config.put("app_log_dir", "${LOG_DIR}");
+ config.put("app_pid_dir", "${WORK_DIR}/app/run");
+ config.put("app_install_dir", "${WORK_DIR}/app/install");
+ config.put("app_conf_dir", "${WORK_DIR}/" + APP_CONF_DIR);
+ config.put("app_input_conf_dir", "${WORK_DIR}/" + PROPAGATED_CONF_DIR_NAME);
+
+ // add optional parameters only if they are not already provided
+ if (!config.containsKey("pid_file")) {
+ config.put("pid_file", "${WORK_DIR}/app/run/component.pid");
+ }
+ if (!config.containsKey("app_root")) {
+ config.put("app_root", "${WORK_DIR}/app/install");
+ }
+ }
+
+ /**
+ * Return a list of hosts based on current ClusterNodes.
+ * @param values cluster nodes
+ * @param hostOnly whether host or host/server name will be added to list
+ * @return list of hosts
+ */
+ public Iterable<String> getHostsList(Collection<ClusterNode> values,
+ boolean hostOnly) {
+ List<String> hosts = new ArrayList<>();
+ for (ClusterNode cn : values) {
+ hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
+ }
+ return hosts;
+ }
+
+ /**
+ * Update ServiceRecord in Registry with IP and hostname.
+ * @param amState access to AM state
+ * @param yarnRegistry acces to YARN registry
+ * @param containerId container ID
+ * @param roleName component name
+ * @param ip list of IPs
+ * @param hostname hostname
+ */
+ public void updateServiceRecord(StateAccessForProviders amState,
+ YarnRegistryViewForProviders yarnRegistry,
+ String containerId, String roleName, List<String> ip, String hostname) {
+ try {
+ RoleInstance role = null;
+ if(ip != null && !ip.isEmpty()){
+ role = amState.getOwnedContainer(containerId);
+ role.ip = ip.get(0);
+ }
+ if(hostname != null && !hostname.isEmpty()){
+ role = amState.getOwnedContainer(containerId);
+ role.hostname = hostname;
+ }
+ if (role != null) {
+ // create and publish updated service record (including hostname & ip)
+ ServiceRecord record = new ServiceRecord();
+ record.set(YarnRegistryAttributes.YARN_ID, containerId);
+ record.description = roleName.replaceAll("_", "-");
+ record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ PersistencePolicies.CONTAINER);
+ // TODO: use constants from YarnRegistryAttributes
+ if (role.ip != null) {
+ record.set("yarn:ip", role.ip);
+ }
+ if (role.hostname != null) {
+ record.set("yarn:hostname", role.hostname);
+ }
+ yarnRegistry.putComponent(
+ RegistryPathUtils.encodeYarnID(containerId), record);
+ }
+ } catch (NoSuchNodeException e) {
+ // ignore - there is nothing to do if we don't find a container
+ log.warn("Owned container {} not found - {}", containerId, e);
+ } catch (IOException e) {
+ log.warn("Error updating container {} service record in registry",
+ containerId, e);
+ }
+ }
+
+ /**
+ * Publish a named property bag that may contain name-value pairs for app
+ * configurations such as hbase-site.
+ * @param name config file identifying name
+ * @param description config file description
+ * @param entries config file properties
+ * @param amState access to AM state
+ */
+ public void publishApplicationInstanceData(String name, String description,
+ Iterable<Map.Entry<String, String>> entries,
+ StateAccessForProviders amState) {
+ PublishedConfiguration pubconf = new PublishedConfiguration(description,
+ entries);
+ log.info("publishing {}", pubconf);
+ amState.getPublishedSliderConfigurations().put(name, pubconf);
+ }
+
+ /**
+ * Publish an export group.
+ * @param exportGroup export groups
+ * @param amState access to AM state
+ * @param roleGroup component group
+ */
+ public void publishExportGroup(Map<String, List<ExportEntry>> exportGroup,
+ StateAccessForProviders amState, String roleGroup) {
+ // Publish in old format for the time being
+ Map<String, String> simpleEntries = new HashMap<>();
+ for (Entry<String, List<ExportEntry>> entry : exportGroup.entrySet()) {
+ List<ExportEntry> exports = entry.getValue();
+ if (SliderUtils.isNotEmpty(exports)) {
+ // there is no support for multiple exports per name, so extract only
+ // the first one
+ simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
+ }
+ }
+ publishApplicationInstanceData(roleGroup, roleGroup,
+ simpleEntries.entrySet(), amState);
+
+ PublishedExports exports = new PublishedExports(roleGroup);
+ exports.setUpdated(new Date().getTime());
+ exports.putValues(exportGroup.entrySet());
+ amState.getPublishedExportsSet().put(roleGroup, exports);
+ }
+
+ public Map<String, String> getExports(ConfTreeOperations appConf,
+ String roleGroup) {
+ Map<String, String> exports = new HashMap<>();
+ propagateOptions(appConf.getComponent(roleGroup).options, exports,
+ null, OptionKeys.EXPORT_PREFIX);
+ return exports;
+ }
+
+ private static final String COMPONENT_TAG = "component";
+ private static final String HOST_FOLDER_FORMAT = "%s:%s";
+ private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
+ private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
+
+ /**
+ * Format the folder locations and publish in the registry service.
+ * @param folders folder information
+ * @param containerId container ID
+ * @param hostFqdn host FQDN
+ * @param componentName component name
+ */
+ public void publishFolderPaths(Map<String, String> folders,
+ String containerId, String componentName, String hostFqdn,
+ StateAccessForProviders amState,
+ Map<String, ExportEntry> logFolderExports,
+ Map<String, ExportEntry> workFolderExports) {
+ Date now = new Date();
+ for (Map.Entry<String, String> entry : folders.entrySet()) {
+ ExportEntry exportEntry = new ExportEntry();
+ exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn,
+ entry.getValue()));
+ exportEntry.setContainerId(containerId);
+ exportEntry.setLevel(COMPONENT_TAG);
+ exportEntry.setTag(componentName);
+ exportEntry.setUpdatedTime(now.toString());
+ if (entry.getKey().equals("AGENT_LOG_ROOT") ||
+ entry.getKey().equals("LOG_DIR")) {
+ synchronized (logFolderExports) {
+ logFolderExports.put(containerId, exportEntry);
+ }
+ } else {
+ synchronized (workFolderExports) {
+ workFolderExports.put(containerId, exportEntry);
+ }
+ }
+ log.info("Updating log and pwd folders for container {}", containerId);
+ }
+
+ PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (logFolderExports) {
+ updateExportsFromList(exports, logFolderExports);
+ }
+ amState.getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
+
+ exports = new PublishedExports(CONTAINER_PWDS_TAG);
+ exports.setUpdated(now.getTime());
+ synchronized (workFolderExports) {
+ updateExportsFromList(exports, workFolderExports);
+ }
+ amState.getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
+ }
+
+ /**
+ * Update the export data from the map.
+ * @param exports published exports
+ * @param folderExports folder exports
+ */
+ private void updateExportsFromList(PublishedExports exports,
+ Map<String, ExportEntry> folderExports) {
+ Map<String, List<ExportEntry>> perComponentList = new HashMap<>();
+ for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet()) {
+ String componentName = logEntry.getValue().getTag();
+ if (!perComponentList.containsKey(componentName)) {
+ perComponentList.put(componentName, new ArrayList<ExportEntry>());
+ }
+ perComponentList.get(componentName).add(logEntry.getValue());
+ }
+ exports.putValues(perComponentList.entrySet());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org