You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:20 UTC
[20/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java
new file mode 100644
index 0000000..9767430
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.exceptions.SliderException;
+
+import java.util.List;
+public interface ProviderCore {
+
+ String getName();
+
+ List<ProviderRole> getRoles();
+
+ Configuration getConf();
+
+ /**
+ * Verify that an instance definition is considered valid by the provider
+ * @param instanceDefinition instance definition
+ * @throws SliderException if the configuration is not valid
+ */
+ void validateInstanceDefinition(AggregateConf instanceDefinition) throws
+ SliderException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java
new file mode 100644
index 0000000..761ac0f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.slider.api.ResourceKeys;
+
+/**
+ * Provider role and key for use in app requests.
+ *
+ * This class uses the role name as the key for hashes and in equality tests,
+ * and ignores the other values.
+ */
+public final class ProviderRole {
+ public final String name;
+ public final String group;
+ public final int id;
+ public int placementPolicy;
+ public int nodeFailureThreshold;
+ public final long placementTimeoutSeconds;
+ public final String labelExpression;
+
+ public ProviderRole(String name, int id) {
+ this(name,
+ name,
+ id,
+ PlacementPolicy.DEFAULT,
+ ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD,
+ ResourceKeys.DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS,
+ ResourceKeys.DEF_YARN_LABEL_EXPRESSION);
+ }
+
+ /**
+ * Create a provider role
+ * @param name role/component name
+ * @param id ID. This becomes the YARN priority
+ * @param policy placement policy
+ * @param nodeFailureThreshold threshold for node failures (within a reset interval)
+ * after which a node failure is considered an app failure
+ * @param placementTimeoutSeconds for lax placement, timeout in seconds before
+ * @param labelExpression label expression for requests; may be null
+ */
+ public ProviderRole(String name,
+ int id,
+ int policy,
+ int nodeFailureThreshold,
+ long placementTimeoutSeconds,
+ String labelExpression) {
+ this(name,
+ name,
+ id,
+ policy,
+ nodeFailureThreshold,
+ placementTimeoutSeconds,
+ labelExpression);
+ }
+
+ /**
+ * Create a provider role with a role group
+ * @param name role/component name
+ * @param group role/component group
+ * @param id ID. This becomes the YARN priority
+ * @param policy placement policy
+ * @param nodeFailureThreshold threshold for node failures (within a reset interval)
+ * after which a node failure is considered an app failure
+ * @param placementTimeoutSeconds for lax placement, timeout in seconds before
+ * @param labelExpression label expression for requests; may be null
+ */
+ public ProviderRole(String name,
+ String group,
+ int id,
+ int policy,
+ int nodeFailureThreshold,
+ long placementTimeoutSeconds,
+ String labelExpression) {
+ this.name = name;
+ if (group == null) {
+ this.group = name;
+ } else {
+ this.group = group;
+ }
+ this.id = id;
+ this.placementPolicy = policy;
+ this.nodeFailureThreshold = nodeFailureThreshold;
+ this.placementTimeoutSeconds = placementTimeoutSeconds;
+ this.labelExpression = labelExpression;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProviderRole that = (ProviderRole) o;
+ return name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ProviderRole{");
+ sb.append("name='").append(name).append('\'');
+ sb.append(", group=").append(group);
+ sb.append(", id=").append(id);
+ sb.append(", placementPolicy=").append(placementPolicy);
+ sb.append(", nodeFailureThreshold=").append(nodeFailureThreshold);
+ sb.append(", placementTimeoutSeconds=").append(placementTimeoutSeconds);
+ sb.append(", labelExpression='").append(labelExpression).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
new file mode 100644
index 0000000..f754eee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.core.main.ExitCodeProvider;
+import org.apache.slider.server.appmaster.actions.QueueAccess;
+import org.apache.slider.server.appmaster.operations.RMOperationHandlerActions;
+import org.apache.slider.server.appmaster.state.ContainerReleaseSelector;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public interface ProviderService extends ProviderCore,
+ Service,
+ RMOperationHandlerActions,
+ ExitCodeProvider {
+
+ /**
+ * Set up the entire container launch context
+ * @param containerLauncher
+ * @param instanceDefinition
+ * @param container
+ * @param providerRole
+ * @param sliderFileSystem
+ * @param generatedConfPath
+ * @param appComponent
+ * @param containerTmpDirPath
+ */
+ void buildContainerLaunchContext(ContainerLauncher containerLauncher,
+ AggregateConf instanceDefinition,
+ Container container,
+ ProviderRole providerRole,
+ SliderFileSystem sliderFileSystem,
+ Path generatedConfPath,
+ MapOperations resourceComponent,
+ MapOperations appComponent,
+ Path containerTmpDirPath) throws
+ IOException,
+ SliderException;
+
+ /**
+ * Notify the providers of container completion
+ * @param containerId container that has completed
+ */
+ void notifyContainerCompleted(ContainerId containerId);
+
+ /**
+ * Execute a process in the AM
+ * @param instanceDefinition cluster description
+ * @param confDir configuration directory
+ * @param env environment
+ * @param execInProgress the callback for the exec events
+ * @return true if a process was actually started
+ * @throws IOException
+ * @throws SliderException
+ */
+ boolean exec(AggregateConf instanceDefinition,
+ File confDir,
+ Map<String, String> env,
+ ProviderCompleted execInProgress) throws IOException,
+ SliderException;
+
+ /**
+ * Scan through the roles and see if it is supported.
+ * @param role role to look for
+ * @return true if the role is known about -and therefore
+ * that a launcher thread can be deployed to launch it
+ */
+ boolean isSupportedRole(String role);
+
+ /**
+ * Load a specific XML configuration file for the provider config
+ * @param confDir configuration directory
+ * @return a configuration to be included in status
+ * @throws BadCommandArgumentsException
+ * @throws IOException
+ */
+ Configuration loadProviderConfigurationInformation(File confDir)
+ throws BadCommandArgumentsException, IOException;
+
+ /**
+ * The application configuration should be initialized here
+ *
+ * @param instanceDefinition
+ * @param fileSystem
+ * @throws IOException
+ * @throws SliderException
+ */
+ void initializeApplicationConfiguration(AggregateConf instanceDefinition,
+ SliderFileSystem fileSystem) throws IOException, SliderException;
+
+ /**
+ * This is a validation of the application configuration on the AM.
+ * Here is where things like the existence of keytabs and other
+ * not-seen-client-side properties can be tested, before
+ * the actual process is spawned.
+ * @param instanceDefinition clusterSpecification
+ * @param confDir configuration directory
+ * @param secure flag to indicate that secure mode checks must exist
+ * @throws IOException IO problemsn
+ * @throws SliderException any failure
+ */
+ void validateApplicationConfiguration(AggregateConf instanceDefinition,
+ File confDir,
+ boolean secure
+ ) throws IOException, SliderException;
+
+ /*
+ * Build the provider status, can be empty
+ * @return the provider status - map of entries to add to the info section
+ */
+ Map<String, String> buildProviderStatus();
+
+ /**
+ * Build a map of data intended for the AM webapp that is specific
+ * about this provider. The key is some text to be displayed, and the
+ * value can be a URL that will create an anchor over the key text.
+ *
+ * If no anchor is needed/desired, insert the key with a null value.
+ * @return the details
+ */
+ Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterSpec);
+
+ /**
+ * Get a human friendly name for web UIs and messages
+ * @return a name string. Default is simply the service instance name.
+ */
+ String getHumanName();
+
+ public void bind(StateAccessForProviders stateAccessor,
+ QueueAccess queueAccess,
+ List<Container> liveContainers);
+
+ /**
+ * Bind to the YARN registry
+ * @param yarnRegistry YARN registry
+ */
+ void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry);
+
+ /**
+ * Returns the agent rest operations interface.
+ * @return the interface if available, null otherwise.
+ */
+ AgentRestOperations getAgentRestOperations();
+
+ /**
+ * Build up the endpoint details for this service
+ * @param details
+ */
+ void buildEndpointDetails(Map<String, MonitorDetail> details);
+
+ /**
+ * Prior to going live -register the initial service registry data
+ * @param amWebURI URL to the AM. This may be proxied, so use relative paths
+ * @param agentOpsURI URI for agent operations. This will not be proxied
+ * @param agentStatusURI URI For agent status. Again: no proxy
+ * @param serviceRecord service record to build up
+ */
+ void applyInitialRegistryDefinitions(URL amWebURI,
+ URL agentOpsURI,
+ URL agentStatusURI,
+ ServiceRecord serviceRecord)
+ throws IOException;
+
+ /**
+ * Create the container release selector for this provider...any policy
+ * can be implemented
+ * @return the selector to use for choosing containers.
+ */
+ ContainerReleaseSelector createContainerReleaseSelector();
+
+ /**
+ * On AM restart (for whatever reason) this API is required to rebuild the AM
+ * internal state with the containers which were already assigned and running
+ *
+ * @param liveContainers
+ * @param applicationId
+ * @param providerRoles
+ */
+ void rebuildContainerDetails(List<Container> liveContainers,
+ String applicationId, Map<Integer, ProviderRole> providerRoles);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
new file mode 100644
index 0000000..07d106b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.OptionKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.RoleKeys;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * this is a factoring out of methods handy for providers. It's bonded to a log at
+ * construction time
+ */
+public class ProviderUtils implements RoleKeys {
+
+ protected final Logger log;
+
+ /**
+ * Create an instace
+ * @param log log directory to use -usually the provider
+ */
+
+ public ProviderUtils(Logger log) {
+ this.log = log;
+ }
+
+ /**
+ * Add oneself to the classpath. This does not work
+ * on minicluster test runs where the JAR is not built up
+ * @param providerResources map of provider resources to add these entries to
+ * @param provider provider to add
+ * @param jarName name of the jar to use
+ * @param sliderFileSystem target filesystem
+ * @param tempPath path in the cluster FS for temp files
+ * @param libdir relative directory to place resources
+ * @param miniClusterTestRun
+ * @return true if the class was found in a JAR
+ *
+ * @throws FileNotFoundException if the JAR was not found and this is NOT
+ * a mini cluster test run
+ * @throws IOException IO problems
+ * @throws SliderException any Slider problem
+ */
+ public static boolean addProviderJar(Map<String, LocalResource> providerResources,
+ Object provider,
+ String jarName,
+ SliderFileSystem sliderFileSystem,
+ Path tempPath,
+ String libdir,
+ boolean miniClusterTestRun) throws
+ IOException,
+ SliderException {
+ try {
+ SliderUtils.putJar(providerResources,
+ sliderFileSystem,
+ provider.getClass(),
+ tempPath,
+ libdir,
+ jarName);
+ return true;
+ } catch (FileNotFoundException e) {
+ if (miniClusterTestRun) {
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add/overwrite the agent tarball (overwritten every time application is restarted)
+ * @param provider
+ * @param tarName
+ * @param sliderFileSystem
+ * @param agentDir
+ * @return true the location could be determined and the file added
+ * @throws IOException
+ */
+ public static boolean addAgentTar(Object provider,
+ String tarName,
+ SliderFileSystem sliderFileSystem,
+ Path agentDir) throws
+ IOException {
+ File localFile = SliderUtils.findContainingJar(provider.getClass());
+ if(localFile != null) {
+ String parentDir = localFile.getParent();
+ Path agentTarPath = new Path(parentDir, tarName);
+ sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, agentTarPath, agentDir);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Add a set of dependencies to the provider resources being built up,
+ * by copying them from the local classpath to the remote one, then
+ * registering them
+ * @param providerResources map of provider resources to add these entries to
+ * @param sliderFileSystem target filesystem
+ * @param tempPath path in the cluster FS for temp files
+ * @param libdir relative directory to place resources
+ * @param resources list of resource names (e.g. "hbase.jar"
+ * @param classes list of classes where classes[i] refers to a class in
+ * resources[i]
+ * @throws IOException IO problems
+ * @throws SliderException any Slider problem
+ */
+ public static void addDependencyJars(Map<String, LocalResource> providerResources,
+ SliderFileSystem sliderFileSystem,
+ Path tempPath,
+ String libdir,
+ String[] resources,
+ Class[] classes
+ ) throws
+ IOException,
+ SliderException {
+ if (resources.length != classes.length) {
+ throw new SliderInternalStateException(
+ "mismatch in Jar names [%d] and classes [%d]",
+ resources.length,
+ classes.length);
+ }
+ int size = resources.length;
+ for (int i = 0; i < size; i++) {
+ String jarName = resources[i];
+ Class clazz = classes[i];
+ SliderUtils.putJar(providerResources,
+ sliderFileSystem,
+ clazz,
+ tempPath,
+ libdir,
+ jarName);
+ }
+
+ }
+
+ /**
+ * Loads all dependency jars from the default path
+ * @param providerResources map of provider resources to add these entries to
+ * @param sliderFileSystem target filesystem
+ * @param tempPath path in the cluster FS for temp files
+ * @param libDir relative directory to place resources
+ * @param libLocalSrcDir explicitly supplied local libs dir
+ * @throws IOException
+ * @throws SliderException
+ */
+ public static void addAllDependencyJars(Map<String, LocalResource> providerResources,
+ SliderFileSystem sliderFileSystem,
+ Path tempPath,
+ String libDir,
+ String libLocalSrcDir)
+ throws IOException, SliderException {
+ String libSrcToUse = libLocalSrcDir;
+ if (SliderUtils.isSet(libLocalSrcDir)) {
+ File file = new File(libLocalSrcDir);
+ if (!file.exists() || !file.isDirectory()) {
+ throw new BadCommandArgumentsException("Supplied lib src dir %s is not valid", libLocalSrcDir);
+ }
+ }
+ SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, libDir, libSrcToUse);
+ }
+
+ /**
+ * build the log directory
+ * @return the log dir
+ */
+ public String getLogdir() throws IOException {
+ String logdir = System.getenv("LOGDIR");
+ if (logdir == null) {
+ logdir =
+ SliderKeys.TMP_LOGDIR_PREFIX + UserGroupInformation.getCurrentUser().getShortUserName();
+ }
+ return logdir;
+ }
+
+
+ public void validateNodeCount(AggregateConf instanceDescription,
+ String name, int min, int max) throws
+ BadCommandArgumentsException {
+ MapOperations component =
+ instanceDescription.getResourceOperations().getComponent(name);
+ int count;
+ if (component == null) {
+ count = 0;
+ } else {
+ count = component.getOptionInt(ResourceKeys.COMPONENT_INSTANCES, 0);
+ }
+ validateNodeCount(name, count, min, max);
+ }
+
+ /**
+ * Validate the node count and heap size values of a node class
+ * <p>
+ * If max <= 0: min <= count
+ * If max > 0: min <= count <= max
+ * @param name node class name
+ * @param count requested node count
+ * @param min requested heap size
+ * @param max maximum value.
+ * @throws BadCommandArgumentsException if the values are out of range
+ */
+ public void validateNodeCount(String name,
+ int count,
+ int min,
+ int max) throws BadCommandArgumentsException {
+ if (count < min) {
+ throw new BadCommandArgumentsException(
+ "requested no of %s nodes: %d is below the minimum of %d", name, count,
+ min);
+ }
+ if (max > 0 && count > max) {
+ throw new BadCommandArgumentsException(
+ "requested no of %s nodes: %d is above the maximum of %d", name, count,
+ max);
+ }
+ }
+
+ /**
+ * copy all options beginning site. into the site.xml
+ * @param clusterSpec cluster specification
+ * @param sitexml map for XML file to build up
+ */
+ public void propagateSiteOptions(ClusterDescription clusterSpec,
+ Map<String, String> sitexml) {
+ Map<String, String> options = clusterSpec.options;
+ propagateSiteOptions(options, sitexml);
+ }
+
+ public void propagateSiteOptions(Map<String, String> options,
+ Map<String, String> sitexml) {
+ propagateSiteOptions(options, sitexml, "");
+ }
+
+ public void propagateSiteOptions(Map<String, String> options,
+ Map<String, String> sitexml,
+ String configName) {
+ propagateSiteOptions(options, sitexml, configName, null);
+ }
+
+ public void propagateSiteOptions(Map<String, String> options,
+ Map<String, String> sitexml,
+ String configName,
+ Map<String,String> tokenMap) {
+ String prefix = OptionKeys.SITE_XML_PREFIX +
+ (!configName.isEmpty() ? configName + "." : "");
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ String envName = key.substring(prefix.length());
+ if (!envName.isEmpty()) {
+ String value = entry.getValue();
+ if (tokenMap != null) {
+ for (Map.Entry<String,String> token : tokenMap.entrySet()) {
+ value = value.replaceAll(Pattern.quote(token.getKey()),
+ token.getValue());
+ }
+ }
+ sitexml.put(envName, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Propagate an option from the cluster specification option map
+ * to the site XML map, using the site key for the name
+ * @param global global config spec
+ * @param optionKey key in the option map
+ * @param sitexml map for XML file to build up
+ * @param siteKey key to assign the value to in the site XML
+ * @throws BadConfigException if the option is missing from the cluster spec
+ */
+ public void propagateOption(MapOperations global,
+ String optionKey,
+ Map<String, String> sitexml,
+ String siteKey) throws BadConfigException {
+ sitexml.put(siteKey, global.getMandatoryOption(optionKey));
+ }
+
+
+ /**
+ * Build the image dir. This path is relative and only valid at the far end
+ * @param instanceDefinition instance definition
+ * @param bindir bin subdir
+ * @param script script in bin subdir
+ * @return the path to the script
+ * @throws FileNotFoundException if a file is not found, or it is not a directory*
+ */
+ public String buildPathToHomeDir(AggregateConf instanceDefinition,
+ String bindir,
+ String script) throws
+ FileNotFoundException,
+ BadConfigException {
+ MapOperations globalOptions =
+ instanceDefinition.getInternalOperations().getGlobalOptions();
+ String applicationHome =
+ globalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME);
+ String imagePath =
+ globalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+ return buildPathToHomeDir(imagePath, applicationHome, bindir, script);
+ }
+
+ public String buildPathToHomeDir(String imagePath,
+ String applicationHome,
+ String bindir, String script) throws
+ FileNotFoundException {
+ String path;
+ File scriptFile;
+ if (imagePath != null) {
+ File tarball = new File(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
+ scriptFile = findBinScriptInExpandedArchive(tarball, bindir, script);
+ // now work back from the script to build the relative path
+ // to the binary which will be valid remote or local
+ StringBuilder builder = new StringBuilder();
+ builder.append(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR);
+ builder.append("/");
+ //for the script, we want the name of ../..
+ File archive = scriptFile.getParentFile().getParentFile();
+ builder.append(archive.getName());
+ path = builder.toString();
+
+ } else {
+ // using a home directory which is required to be present on
+ // the local system -so will be absolute and resolvable
+ File homedir = new File(applicationHome);
+ path = homedir.getAbsolutePath();
+
+ //this is absolute, resolve its entire path
+ SliderUtils.verifyIsDir(homedir, log);
+ File bin = new File(homedir, bindir);
+ SliderUtils.verifyIsDir(bin, log);
+ scriptFile = new File(bin, script);
+ SliderUtils.verifyFileExists(scriptFile, log);
+ }
+ return path;
+ }
+
+
+ /**
+ * Build the image dir. This path is relative and only valid at the far end
+ * @param instance instance options
+ * @param bindir bin subdir
+ * @param script script in bin subdir
+ * @return the path to the script
+ * @throws FileNotFoundException if a file is not found, or it is not a directory*
+ */
+ public String buildPathToScript(AggregateConf instance,
+ String bindir,
+ String script) throws FileNotFoundException {
+ return buildPathToScript(instance.getInternalOperations(), bindir, script);
+ }
+ /**
+ * Build the image dir. This path is relative and only valid at the far end
+ * @param internal internal options
+ * @param bindir bin subdir
+ * @param script script in bin subdir
+ * @return the path to the script
+ * @throws FileNotFoundException if a file is not found, or it is not a directory*
+ */
+ public String buildPathToScript(ConfTreeOperations internal,
+ String bindir,
+ String script) throws FileNotFoundException {
+
+ String homedir = buildPathToHomeDir(
+ internal.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH),
+ internal.get(InternalKeys.INTERNAL_APPLICATION_HOME),
+ bindir,
+ script);
+ return buildScriptPath(bindir, script, homedir);
+ }
+
+
+
+ public String buildScriptPath(String bindir, String script, String homedir) {
+ StringBuilder builder = new StringBuilder(homedir);
+ builder.append("/");
+ builder.append(bindir);
+ builder.append("/");
+ builder.append(script);
+ return builder.toString();
+ }
+
+
+ public static String convertToAppRelativePath(File file) {
+ return convertToAppRelativePath(file.getPath());
+ }
+
+ public static String convertToAppRelativePath(String path) {
+ return ApplicationConstants.Environment.PWD.$() + "/" + path;
+ }
+
+
+ public static void validatePathReferencesLocalDir(String meaning, String path)
+ throws BadConfigException {
+ File file = new File(path);
+ if (!file.exists()) {
+ throw new BadConfigException("%s directory %s not found", meaning, file);
+ }
+ if (!file.isDirectory()) {
+ throw new BadConfigException("%s is not a directory: %s", meaning, file);
+ }
+ }
+
+ /**
+ * get the user name
+ * @return the user name
+ */
+ public String getUserName() throws IOException {
+ return UserGroupInformation.getCurrentUser().getShortUserName();
+ }
+
+ /**
+ * Find a script in an expanded archive
+ * @param base base directory
+ * @param bindir bin subdir
+ * @param script script in bin subdir
+ * @return the path to the script
+ * @throws FileNotFoundException if a file is not found, or it is not a directory
+ */
+ public File findBinScriptInExpandedArchive(File base,
+ String bindir,
+ String script)
+ throws FileNotFoundException {
+
+ SliderUtils.verifyIsDir(base, log);
+ File[] ls = base.listFiles();
+ if (ls == null) {
+ //here for the IDE to be happy, as the previous check will pick this case
+ throw new FileNotFoundException("Failed to list directory " + base);
+ }
+
+ log.debug("Found {} entries in {}", ls.length, base);
+ List<File> directories = new LinkedList<File>();
+ StringBuilder dirs = new StringBuilder();
+ for (File file : ls) {
+ log.debug("{}", false);
+ if (file.isDirectory()) {
+ directories.add(file);
+ dirs.append(file.getPath()).append(" ");
+ }
+ }
+ if (directories.size() > 1) {
+ throw new FileNotFoundException(
+ "Too many directories in archive to identify binary: " + dirs);
+ }
+ if (directories.isEmpty()) {
+ throw new FileNotFoundException(
+ "No directory found in archive " + base);
+ }
+ File archive = directories.get(0);
+ File bin = new File(archive, bindir);
+ SliderUtils.verifyIsDir(bin, log);
+ File scriptFile = new File(bin, script);
+ SliderUtils.verifyFileExists(scriptFile, log);
+ return scriptFile;
+ }
+
+ /**
+ * Return any additional arguments (argv) to provide when starting this role
+ *
+ * @param roleOptions
+ * The options for this role
+ * @return A non-null String which contains command line arguments for this role, or the empty string.
+ */
+ public static String getAdditionalArgs(Map<String,String> roleOptions) {
+ if (roleOptions.containsKey(RoleKeys.ROLE_ADDITIONAL_ARGS)) {
+ String additionalArgs = roleOptions.get(RoleKeys.ROLE_ADDITIONAL_ARGS);
+ if (null != additionalArgs) {
+ return additionalArgs;
+ }
+ }
+
+ return "";
+ }
+
+ public int getRoleResourceRequirement(String val,
+ int defVal,
+ int maxVal) {
+ if (val==null) {
+ val = Integer.toString(defVal);
+ }
+ Integer intVal;
+ if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) {
+ intVal = maxVal;
+ } else {
+ intVal = Integer.decode(val);
+ }
+ return intVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java
new file mode 100644
index 0000000..5dd4a32
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for factories
+ */
+public abstract class SliderProviderFactory extends Configured {
+
+ public static final String DEFAULT_CLUSTER_TYPE = AgentKeys.PROVIDER_AGENT;
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(SliderProviderFactory.class);
+ public static final String PROVIDER_NOT_FOUND =
+ "Unable to find provider of application type %s";
+
+ public SliderProviderFactory(Configuration conf) {
+ super(conf);
+ }
+
+ protected SliderProviderFactory() {
+ }
+
+ public abstract AbstractClientProvider createClientProvider();
+
+ public abstract ProviderService createServerProvider();
+
+ /**
+ * Create a provider for a specific application
+ * @param application app
+ * @return app instance
+ * @throws SliderException on any instantiation problem
+ */
+ public static SliderProviderFactory createSliderProviderFactory(String application) throws
+ SliderException {
+ Configuration conf = loadSliderConfiguration();
+ if (application == null) {
+ application = DEFAULT_CLUSTER_TYPE;
+ }
+ String providerKey =
+ String.format(SliderXmlConfKeys.KEY_PROVIDER, application);
+ if (application.contains(".")) {
+ log.debug("Treating {} as a classname", application);
+ String name = "classname.key";
+ conf.set(name, application);
+ providerKey = name;
+ }
+
+ Class<? extends SliderProviderFactory> providerClass;
+ try {
+ providerClass = conf.getClass(providerKey, null, SliderProviderFactory.class);
+ } catch (RuntimeException e) {
+ throw new BadClusterStateException(e, "Failed to load provider %s: %s", application, e);
+ }
+ if (providerClass == null) {
+ throw new BadClusterStateException(PROVIDER_NOT_FOUND, application);
+ }
+
+ Exception ex;
+ try {
+ SliderProviderFactory providerFactory = providerClass.newInstance();
+ providerFactory.setConf(conf);
+ return providerFactory;
+ } catch (Exception e) {
+ ex = e;
+ }
+ //by here the operation failed and ex is set to the value
+ throw new BadClusterStateException(ex,
+ "Failed to create an instance of %s : %s",
+ providerClass,
+ ex);
+ }
+
+ /**
+ * Load a configuration with the {@link SliderKeys#SLIDER_XML} resource
+ * included
+ * @return a configuration instance
+ */
+ public static Configuration loadSliderConfiguration() {
+ Configuration conf = new Configuration();
+ conf.addResource(SliderKeys.SLIDER_XML);
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
new file mode 100644
index 0000000..4c6a50b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import com.google.common.io.Files;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.client.ClientUtils;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.launch.AbstractLauncher;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.providers.ProviderUtils;
+import org.apache.slider.providers.agent.application.metadata.Application;
+import org.apache.slider.providers.agent.application.metadata.Component;
+import org.apache.slider.providers.agent.application.metadata.ConfigFile;
+import org.apache.slider.providers.agent.application.metadata.Metainfo;
+import org.apache.slider.providers.agent.application.metadata.MetainfoParser;
+import org.apache.slider.providers.agent.application.metadata.OSPackage;
+import org.apache.slider.providers.agent.application.metadata.OSSpecific;
+import org.apache.slider.providers.agent.application.metadata.Package;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** This class implements the client-side aspects of the agent deployer */
+public class AgentClientProvider extends AbstractClientProvider
+ implements AgentKeys, SliderKeys {
+
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(AgentClientProvider.class);
+ protected static final String NAME = "agent";
+ private static final ProviderUtils providerUtils = new ProviderUtils(log);
+ public static final String E_COULD_NOT_READ_METAINFO
+ = "Not a valid app package. Could not read metainfo.";
+
+ protected Map<String, Metainfo> metaInfoMap = new ConcurrentHashMap<String, Metainfo>();
+
+ protected AgentClientProvider(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public List<ProviderRole> getRoles() {
+ return AgentRoles.getRoles();
+ }
+
+ @Override //Client
+ public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
+ String clustername,
+ Configuration configuration,
+ AggregateConf instanceDefinition,
+ Path clusterDirPath,
+ Path generatedConfDirPath,
+ boolean secure) throws
+ SliderException,
+ IOException {
+ super.preflightValidateClusterConfiguration(sliderFileSystem, clustername,
+ configuration,
+ instanceDefinition,
+ clusterDirPath,
+ generatedConfDirPath, secure);
+
+ String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition
+ .getAppConfOperations());
+ Path appDefPath = new Path(appDef);
+ sliderFileSystem.verifyFileExists(appDefPath);
+
+ String agentConf = instanceDefinition.getAppConfOperations().
+ getGlobalOptions().getOption(AgentKeys.AGENT_CONF, "");
+ if (StringUtils.isNotEmpty(agentConf)) {
+ sliderFileSystem.verifyFileExists(new Path(agentConf));
+ }
+
+ String appHome = instanceDefinition.getAppConfOperations().
+ getGlobalOptions().get(AgentKeys.PACKAGE_PATH);
+ if (SliderUtils.isUnset(appHome)) {
+ String agentImage = instanceDefinition.getInternalOperations().
+ get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+ sliderFileSystem.verifyFileExists(new Path(agentImage));
+ }
+ }
+
+ @Override
+ public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
+ SliderException {
+ super.validateInstanceDefinition(instanceDefinition, fs);
+ log.debug("Validating conf {}", instanceDefinition);
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+
+ providerUtils.validateNodeCount(instanceDefinition, ROLE_NODE,
+ 0, -1);
+
+ String appDef = null;
+ try {
+ // Validate the app definition
+ appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition
+ .getAppConfOperations());
+ } catch (BadConfigException bce) {
+ throw new BadConfigException("Application definition must be provided. " + bce.getMessage());
+ }
+
+ log.info("Validating app definition {}", appDef);
+ String extension = appDef.substring(appDef.lastIndexOf(".") + 1, appDef.length());
+ if (!"zip".equals(extension.toLowerCase(Locale.ENGLISH))) {
+ throw new BadConfigException("App definition must be packaged as a .zip file. File provided is " + appDef);
+ }
+
+ Set<String> names = resources.getComponentNames();
+ names.remove(SliderKeys.COMPONENT_AM);
+ Map<Integer, String> priorityMap = new HashMap<Integer, String>();
+
+ Metainfo metaInfo = getMetainfo(fs, appDef);
+
+ for (String name : names) {
+ MapOperations component = resources.getMandatoryComponent(name);
+
+ if (metaInfo != null) {
+ Component componentDef = metaInfo.getApplicationComponent(name);
+ if (componentDef == null) {
+ throw new BadConfigException(
+ "Component %s is not a member of application.", name);
+ }
+ }
+
+ int priority =
+ component.getMandatoryOptionInt(ResourceKeys.COMPONENT_PRIORITY);
+ if (priority <= 0) {
+ throw new BadConfigException("Component %s %s value out of range %d",
+ name,
+ ResourceKeys.COMPONENT_PRIORITY,
+ priority);
+ }
+
+ String existing = priorityMap.get(priority);
+ if (existing != null) {
+ throw new BadConfigException(
+ "Component %s has a %s value %d which duplicates that of %s",
+ name,
+ ResourceKeys.COMPONENT_PRIORITY,
+ priority,
+ existing);
+ }
+ priorityMap.put(priority, name);
+ }
+
+ // fileSystem may be null for tests
+ if (metaInfo != null) {
+ for (String name : names) {
+ Component componentDef = metaInfo.getApplicationComponent(name);
+ if (componentDef == null) {
+ throw new BadConfigException(
+ "Component %s is not a member of application.", name);
+ }
+
+ // ensure that intance count is 0 for client components
+ if ("CLIENT".equals(componentDef.getCategory())) {
+ MapOperations componentConfig = resources.getMandatoryComponent(name);
+ int count =
+ componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES);
+ if (count > 0) {
+ throw new BadConfigException("Component %s is of type CLIENT and cannot be instantiated."
+ + " Use \"slider client install ...\" command instead.",
+ name);
+ }
+ } else {
+ MapOperations componentConfig = resources.getMandatoryComponent(name);
+ int count =
+ componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES);
+ int definedMinCount = componentDef.getMinInstanceCountInt();
+ int definedMaxCount = componentDef.getMaxInstanceCountInt();
+ if (count < definedMinCount || count > definedMaxCount) {
+ throw new BadConfigException("Component %s, %s value %d out of range. "
+ + "Expected minimum is %d and maximum is %d",
+ name,
+ ResourceKeys.COMPONENT_INSTANCES,
+ count,
+ definedMinCount,
+ definedMaxCount);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
+ Configuration serviceConf,
+ AbstractLauncher launcher,
+ AggregateConf instanceDefinition,
+ Path snapshotConfDirPath,
+ Path generatedConfDirPath,
+ Configuration clientConfExtras,
+ String libdir,
+ Path tempPath,
+ boolean miniClusterTestRun) throws
+ IOException,
+ SliderException {
+ String agentImage = instanceDefinition.getInternalOperations().
+ get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
+ if (SliderUtils.isUnset(agentImage)) {
+ Path agentPath = new Path(tempPath.getParent(), AgentKeys.PROVIDER_AGENT);
+ log.info("Automatically uploading the agent tarball at {}", agentPath);
+ fileSystem.getFileSystem().mkdirs(agentPath);
+ if (ProviderUtils.addAgentTar(this, AGENT_TAR, fileSystem, agentPath)) {
+ instanceDefinition.getInternalOperations().set(
+ InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH,
+ new Path(agentPath, AGENT_TAR).toUri());
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getApplicationTags(SliderFileSystem fileSystem,
+ String appDef) throws SliderException {
+ Set<String> tags;
+ Metainfo metaInfo = getMetainfo(fileSystem, appDef);
+
+ if (metaInfo == null) {
+ log.error("Error retrieving metainfo from {}", appDef);
+ throw new SliderException("Error parsing metainfo file, possibly bad structure.");
+ }
+
+ Application application = metaInfo.getApplication();
+ tags = new HashSet<String>();
+ tags.add("Name: " + application.getName());
+ tags.add("Version: " + application.getVersion());
+ tags.add("Description: " + SliderUtils.truncate(application.getComment(), 80));
+
+ return tags;
+ }
+
+ @Override
+ public void processClientOperation(SliderFileSystem fileSystem,
+ RegistryOperations rops,
+ Configuration configuration,
+ String operation,
+ File clientInstallPath,
+ File appPackage,
+ JSONObject config,
+ String name) throws SliderException {
+ // create temp folder
+ // create sub-folders app_pkg, agent_pkg, command
+ File tmpDir = Files.createTempDir();
+ log.info("Command is being executed at {}", tmpDir.getAbsolutePath());
+ File appPkgDir = new File(tmpDir, "app_pkg");
+ appPkgDir.mkdir();
+
+ File agentPkgDir = new File(tmpDir, "agent_pkg");
+ agentPkgDir.mkdir();
+
+ File cmdDir = new File(tmpDir, "command");
+ cmdDir.mkdir();
+
+ Metainfo metaInfo = null;
+ JSONObject defaultConfig = null;
+ try {
+ // expand app package into /app_pkg
+ ZipInputStream zipInputStream = null;
+ try {
+ zipInputStream = new ZipInputStream(new FileInputStream(appPackage));
+ {
+ ZipEntry zipEntry = zipInputStream.getNextEntry();
+ while (zipEntry != null) {
+ log.info("Processing {}", zipEntry.getName());
+ String filePath = appPkgDir + File.separator + zipEntry.getName();
+ if (!zipEntry.isDirectory()) {
+ log.info("Extracting file {}", filePath);
+ extractFile(zipInputStream, filePath);
+
+ if ("metainfo.xml".equals(zipEntry.getName())) {
+ FileInputStream input = null;
+ try {
+ input = new FileInputStream(filePath);
+ metaInfo = new MetainfoParser().fromXmlStream(input);
+ } finally {
+ IOUtils.closeStream(input);
+ }
+ } else if ("metainfo.json".equals(zipEntry.getName())) {
+ FileInputStream input = null;
+ try {
+ input = new FileInputStream(filePath);
+ metaInfo = new MetainfoParser().fromJsonStream(input);
+ } finally {
+ IOUtils.closeStream(input);
+ }
+ } else if ("clientInstallConfig-default.json".equals(zipEntry.getName())) {
+ try {
+ defaultConfig = new JSONObject(FileUtils.readFileToString(new File(filePath), Charset.defaultCharset()));
+ } catch (JSONException jex) {
+ throw new SliderException("Unable to read default client config.", jex);
+ }
+ }
+ } else {
+ log.info("Creating dir {}", filePath);
+ File dir = new File(filePath);
+ dir.mkdir();
+ }
+ zipInputStream.closeEntry();
+ zipEntry = zipInputStream.getNextEntry();
+ }
+ }
+ } finally {
+ zipInputStream.close();
+ }
+
+ if (metaInfo == null) {
+ throw new BadConfigException(E_COULD_NOT_READ_METAINFO);
+ }
+
+ String clientScript = null;
+ String clientComponent = null;
+ for (Component component : metaInfo.getApplication().getComponents()) {
+ if (component.getCategory().equals("CLIENT")) {
+ clientComponent = component.getName();
+ if (component.getCommandScript() != null) {
+ clientScript = component.getCommandScript().getScript();
+ }
+ break;
+ }
+ }
+
+ if (SliderUtils.isUnset(clientScript)) {
+ log.info("Installing CLIENT without script");
+ List<Package> packages = metaInfo.getApplication().getPackages();
+ if (packages.size() > 0) {
+ // retrieve package resources from HDFS and extract
+ for (Package pkg : packages) {
+ Path pkgPath = fileSystem.buildResourcePath(pkg.getName());
+ if (!fileSystem.isFile(pkgPath) && name != null) {
+ pkgPath = fileSystem.buildResourcePath(name, pkg.getName());
+ }
+ if (!fileSystem.isFile(pkgPath)) {
+ throw new IOException("Package doesn't exist as a resource: " +
+ pkg.getName());
+ }
+ if ("archive".equals(pkg.getType())) {
+ File pkgFile = new File(tmpDir, pkg.getName());
+ fileSystem.copyHdfsFileToLocal(pkgPath, pkgFile);
+ expandTar(pkgFile, clientInstallPath);
+ } else {
+ File pkgFile = new File(clientInstallPath, pkg.getName());
+ fileSystem.copyHdfsFileToLocal(pkgPath, pkgFile);
+ }
+ }
+ } else {
+ // extract tarball from app def
+ for (OSSpecific osSpecific : metaInfo.getApplication()
+ .getOSSpecifics()) {
+ for (OSPackage pkg : osSpecific.getPackages()) {
+ if ("tarball".equals(pkg.getType())) {
+ File pkgFile = new File(appPkgDir, pkg.getName());
+ expandTar(pkgFile, clientInstallPath);
+ }
+ }
+ }
+ }
+ if (name == null) {
+ log.warn("Conf files not being generated because no app name was " +
+ "provided");
+ return;
+ }
+ File confInstallDir;
+ String clientRoot = null;
+ if (config != null) {
+ try {
+ clientRoot = config.getJSONObject("global")
+ .getString(AgentKeys.APP_CLIENT_ROOT);
+ } catch (JSONException e) {
+ log.info("Couldn't read {} from provided client config, falling " +
+ "back on default", AgentKeys.APP_CLIENT_ROOT);
+ }
+ }
+ if (clientRoot == null && defaultConfig != null) {
+ try {
+ clientRoot = defaultConfig.getJSONObject("global")
+ .getString(AgentKeys.APP_CLIENT_ROOT);
+ } catch (JSONException e) {
+ log.info("Couldn't read {} from default client config, using {}",
+ AgentKeys.APP_CLIENT_ROOT, clientInstallPath);
+ }
+ }
+ if (clientRoot == null) {
+ confInstallDir = clientInstallPath;
+ } else {
+ confInstallDir = new File(new File(clientInstallPath, clientRoot), "conf");
+ if (!confInstallDir.exists()) {
+ confInstallDir.mkdirs();
+ }
+ }
+ String user = RegistryUtils.currentUser();
+ for (ConfigFile configFile : metaInfo.getComponentConfigFiles(clientComponent)) {
+ retrieveConfigFile(rops, configuration, configFile, name, user,
+ confInstallDir);
+ }
+ } else {
+ log.info("Installing CLIENT using script {}", clientScript);
+ expandAgentTar(agentPkgDir);
+
+ JSONObject commandJson = getCommandJson(defaultConfig, config, metaInfo, clientInstallPath, name);
+ FileWriter file = new FileWriter(new File(cmdDir, "command.json"));
+ try {
+ file.write(commandJson.toString());
+
+ } catch (IOException e) {
+ log.error("Couldn't write command.json to file");
+ } finally {
+ file.flush();
+ file.close();
+ }
+
+ runCommand(appPkgDir, agentPkgDir, cmdDir, clientScript);
+ }
+
+ } catch (IOException ioex) {
+ log.warn("Error while executing INSTALL command {}", ioex.getMessage());
+ throw new SliderException("INSTALL client failed.");
+ }
+ }
+
+ protected void runCommand(
+ File appPkgDir,
+ File agentPkgDir,
+ File cmdDir,
+ String clientScript) throws SliderException {
+ int exitCode = 0;
+ Exception exp = null;
+ try {
+ String clientScriptPath = appPkgDir.getAbsolutePath() + File.separator + "package" +
+ File.separator + clientScript;
+ List<String> command = Arrays.asList(AgentKeys.PYTHON_EXE,
+ "-S",
+ clientScriptPath,
+ "INSTALL",
+ cmdDir.getAbsolutePath() + File.separator + "command.json",
+ appPkgDir.getAbsolutePath() + File.separator + "package",
+ cmdDir.getAbsolutePath() + File.separator + "command-out.json",
+ "DEBUG");
+ ProcessBuilder pb = new ProcessBuilder(command);
+ log.info("Command: " + StringUtils.join(pb.command(), " "));
+ pb.environment().put(SliderKeys.PYTHONPATH,
+ agentPkgDir.getAbsolutePath()
+ + File.separator + "slider-agent" + File.pathSeparator
+ + agentPkgDir.getAbsolutePath()
+ + File.separator + "slider-agent/jinja2");
+ log.info("{}={}", SliderKeys.PYTHONPATH, pb.environment().get(SliderKeys.PYTHONPATH));
+
+ Process proc = pb.start();
+ InputStream stderr = proc.getErrorStream();
+ InputStream stdout = proc.getInputStream();
+ BufferedReader stdOutReader = new BufferedReader(new InputStreamReader(stdout));
+ BufferedReader stdErrReader = new BufferedReader(new InputStreamReader(stderr));
+
+ proc.waitFor();
+
+ String line;
+ while ((line = stdOutReader.readLine()) != null) {
+ log.info("Stdout: " + line);
+ }
+ while ((line = stdErrReader.readLine()) != null) {
+ log.info("Stderr: " + line);
+ }
+
+ exitCode = proc.exitValue();
+ log.info("Exit value is {}", exitCode);
+ } catch (IOException e) {
+ exp = e;
+ } catch (InterruptedException e) {
+ exp = e;
+ }
+
+ if (exitCode != 0) {
+ throw new SliderException("INSTALL client failed with exit code " + exitCode);
+ }
+
+ if (exp != null) {
+ log.error("Error while executing INSTALL command {}. Stack trace {}",
+ exp.getMessage(),
+ ExceptionUtils.getStackTrace(exp));
+ throw new SliderException("INSTALL client failed.", exp);
+ }
+ }
+
+ private void expandAgentTar(File agentPkgDir) throws IOException {
+ String libDirProp =
+ System.getProperty(SliderKeys.PROPERTY_LIB_DIR);
+ File tarFile = new File(libDirProp, SliderKeys.AGENT_TAR);
+ expandTar(tarFile, agentPkgDir);
+ }
+
+ private void expandTar(File tarFile, File destDir) throws IOException {
+ log.info("Expanding tar {} to {}", tarFile, destDir);
+ TarArchiveInputStream tarIn = new TarArchiveInputStream(
+ new GzipCompressorInputStream(
+ new BufferedInputStream(
+ new FileInputStream(tarFile)
+ )
+ )
+ );
+ try {
+ TarArchiveEntry tarEntry = tarIn.getNextTarEntry();
+ while (tarEntry != null) {
+ File destPath = new File(destDir, tarEntry.getName());
+ File parent = destPath.getParentFile();
+ if (!parent.exists()) {
+ parent.mkdirs();
+ }
+ if (tarEntry.isDirectory()) {
+ destPath.mkdirs();
+ } else {
+ byte[] byteToRead = new byte[1024];
+ BufferedOutputStream buffOut =
+ new BufferedOutputStream(new FileOutputStream(destPath));
+ try {
+ int len;
+ while ((len = tarIn.read(byteToRead)) != -1) {
+ buffOut.write(byteToRead, 0, len);
+ }
+ } finally {
+ buffOut.close();
+ }
+ }
+ if ((tarEntry.getMode() & 0100) != 0) {
+ destPath.setExecutable(true);
+ }
+ tarEntry = tarIn.getNextTarEntry();
+ }
+ } finally {
+ tarIn.close();
+ }
+ }
+
+ private void retrieveConfigFile(RegistryOperations rops,
+ Configuration configuration, ConfigFile configFile, String name,
+ String user, File destDir) throws IOException, SliderException {
+ log.info("Retrieving config {} to {}", configFile.getDictionaryName(),
+ destDir);
+ PublishedConfiguration published = ClientUtils.getConfigFromRegistry(rops,
+ configuration, configFile.getDictionaryName(), name, user, true);
+ ClientUtils.saveOrReturnConfig(published, configFile.getType(),
+ destDir, configFile.getFileName());
+ }
+
+ protected JSONObject getCommandJson(JSONObject defaultConfig,
+ JSONObject inputConfig,
+ Metainfo metainfo,
+ File clientInstallPath,
+ String name) throws SliderException {
+ try {
+ JSONObject pkgList = new JSONObject();
+ pkgList.put(AgentKeys.PACKAGE_LIST,
+ AgentProviderService.getPackageListFromApplication(metainfo.getApplication()));
+ JSONObject obj = new JSONObject();
+ obj.put("hostLevelParams", pkgList);
+
+ String user = RegistryUtils.currentUser();
+ JSONObject configuration = new JSONObject();
+ JSONObject global = new JSONObject();
+ global.put("app_install_dir", clientInstallPath.getAbsolutePath());
+ global.put("app_user", user);
+ if (name != null) {
+ global.put("app_name", name);
+ }
+
+ if (defaultConfig != null) {
+ readConfigEntries(defaultConfig, clientInstallPath, global, name, user);
+ }
+ if (inputConfig != null) {
+ readConfigEntries(inputConfig, clientInstallPath, global, name, user);
+ }
+
+ configuration.put("global", global);
+ obj.put("configurations", configuration);
+ return obj;
+ } catch (JSONException jex) {
+ log.warn("Error while executing INSTALL command {}", jex.getMessage());
+ throw new SliderException("INSTALL client failed.");
+ }
+ }
+
+ private void readConfigEntries(JSONObject inpConfig,
+ File clientInstallPath,
+ JSONObject globalConfig,
+ String name, String user)
+ throws JSONException {
+ JSONObject globalSection = inpConfig.getJSONObject("global");
+ Iterator it = globalSection.keys();
+ while (it.hasNext()) {
+ String key = (String) it.next();
+ String value = globalSection.getString(key);
+ if (SliderUtils.isSet(value)) {
+ value = value.replace("{app_install_dir}", clientInstallPath.getAbsolutePath());
+ value = value.replace("{app_user}", user);
+ if (name != null) {
+ value = value.replace("{app_name}", name);
+ }
+ }
+ if (globalConfig.has(key)) {
+ // last one wins
+ globalConfig.remove(key);
+ }
+ globalConfig.put(key, value);
+ }
+ }
+
+ private void extractFile(ZipInputStream zipInputStream, String filePath) throws IOException {
+ BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(filePath));
+ try {
+ byte[] bytesRead = new byte[4096];
+ int read = 0;
+ while ((read = zipInputStream.read(bytesRead)) != -1) {
+ output.write(bytesRead, 0, read);
+ }
+ } finally {
+ output.close();
+ }
+ }
+
+ private Metainfo getMetainfo(SliderFileSystem fs, String appDef) {
+ Metainfo metaInfo = metaInfoMap.get(appDef);
+ if (fs != null && metaInfo == null) {
+ try {
+ metaInfo = AgentUtils.getApplicationMetainfo(fs, appDef, false);
+ metaInfoMap.put(appDef, metaInfo);
+ } catch (IOException ioe) {
+ // Ignore missing metainfo file for now
+ log.info("Missing metainfo {}", ioe.getMessage());
+ } catch (BadConfigException bce) {
+ log.info("Bad Configuration {}", bce.getMessage());
+ }
+ }
+ return metaInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
new file mode 100644
index 0000000..01a3f1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+/*
+
+ */
+public interface AgentKeys {
+
+ String PROVIDER_AGENT = "agent";
+ String ROLE_NODE = "echo";
+
+ /**
+ * Template stored in the slider classpath -to use if there is
+ * no site-specific template
+ * {@value}
+ */
+ String CONF_RESOURCE = "org/apache/slider/providers/agent/conf/";
+ /* URL to talk back to Agent Controller*/
+ String CONTROLLER_URL = "agent.controller.url";
+ /**
+ * The location of pre-installed agent path.
+ * This can be also be dynamically computed based on Yarn installation of agent.
+ */
+ String PACKAGE_PATH = "agent.package.root";
+ /**
+ * The location of the script implementing the command.
+ */
+ String SCRIPT_PATH = "agent.script";
+ /**
+ * Execution home for the agent.
+ */
+ String APP_HOME = "app.home";
+ String APP_ROOT = "site.global.app_root";
+ String APP_CLIENT_ROOT = "client_root";
+ /**
+ * Runas user of the application
+ */
+ String RUNAS_USER = "site.global.app_user";
+ /**
+ * Name of the service.
+ */
+ String SERVICE_NAME = "app.name";
+ String ARG_LABEL = "--label";
+ String ARG_HOST = "--host";
+ String ARG_PORT = "--port";
+ String ARG_SECURED_PORT = "--secured_port";
+ String ARG_ZOOKEEPER_QUORUM = "--zk-quorum";
+ String ARG_ZOOKEEPER_REGISTRY_PATH = "--zk-reg-path";
+ String ARG_DEBUG = "--debug";
+ String AGENT_MAIN_SCRIPT_ROOT = "./infra/agent/slider-agent/";
+ String AGENT_JINJA2_ROOT = "./infra/agent/slider-agent/jinja2";
+ String AGENT_MAIN_SCRIPT = "agent/main.py";
+
+ String APP_DEF = "application.def";
+ String ADDON_PREFIX = "application.addon.";
+ String ADDONS = "application.addons";
+ String AGENT_VERSION = "agent.version";
+ String AGENT_CONF = "agent.conf";
+ String ADDON_FOR_ALL_COMPONENTS = "ALL";
+
+ String APP_RESOURCES = "application.resources";
+ String APP_RESOURCES_DIR = "app/resources";
+
+ String APP_CONF_DIR = "app/conf";
+
+ String AGENT_INSTALL_DIR = "infra/agent";
+ String APP_DEFINITION_DIR = "app/definition";
+ String ADDON_DEFINITION_DIR = "addon/definition";
+ String AGENT_CONFIG_FILE = "infra/conf/agent.ini";
+ String AGENT_VERSION_FILE = "infra/version";
+ String APP_PACKAGES_DIR = "app/packages";
+ String PER_COMPONENT = "per.component";
+ String PER_GROUP = "per.group";
+
+ String JAVA_HOME = "java_home";
+ String PACKAGE_LIST = "package_list";
+ String SYSTEM_CONFIGS = "system_configs";
+ String WAIT_HEARTBEAT = "wait.heartbeat";
+ String PYTHON_EXE = "python";
+ String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node";
+ String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval";
+ String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data";
+ String AGENT_OUT_FILE = "slider-agent.out";
+ String KEY_AGENT_TWO_WAY_SSL_ENABLED = "ssl.server.client.auth";
+ String INFRA_RUN_SECURITY_DIR = "infra/run/security/";
+ String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt";
+ String KEY_CONTAINER_LAUNCH_DELAY = "container.launch.delay.sec";
+ String TEST_RELAX_VERIFICATION = "test.relax.validation";
+ String AM_CONFIG_GENERATION = "am.config.generation";
+}
+
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
new file mode 100644
index 0000000..18c6374
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+class AgentLaunchParameter {
+ public static final Logger log =
+ LoggerFactory.getLogger(AgentLaunchParameter.class);
+ private static final String DEFAULT_PARAMETER = "";
+ private static final String ANY_COMPONENT = "ANY";
+ private static final String NONE_VALUE = "NONE";
+ private final Map<String, CommandTracker> launchParameterTracker;
+
+ public AgentLaunchParameter(String parameters) {
+ launchParameterTracker = parseExpectedLaunchParameters(parameters);
+ }
+
+ /**
+ * Get command for the component type
+ *
+ * @param componentGroup
+ *
+ * @return
+ */
+ public String getNextLaunchParameter(String componentGroup) {
+ if (launchParameterTracker != null) {
+ if (launchParameterTracker.containsKey(componentGroup)
+ || launchParameterTracker.containsKey(ANY_COMPONENT)) {
+ synchronized (this) {
+ CommandTracker indexTracker = null;
+ if (launchParameterTracker.containsKey(componentGroup)) {
+ indexTracker = launchParameterTracker.get(componentGroup);
+ } else {
+ indexTracker = launchParameterTracker.get(ANY_COMPONENT);
+ }
+
+ return indexTracker.getNextCommand();
+ }
+ }
+ }
+
+ return DEFAULT_PARAMETER;
+ }
+
+ /**
+ * Parse launch parameters of the form ANY:PARAM_FOR_FIRST:PARAM_FOR_SECOND:...:PARAM_FOR_REST|HBASE_MASTER:...
+ *
+ * E.g. ANY:DO_NOT_REGISTER:DO_NOT_HEARTBEAT:NONE For any container, first one gets DO_NOT_REGISTER second one gets
+ * DO_NOT_HEARTBEAT, then all of the rest get nothing
+ *
+ * E.g. HBASE_MASTER:FAIL_AFTER_START:NONE For HBASE_MASTER, first one gets FAIL_AFTER_START then "" for all
+ *
+ * @param launchParameters
+ *
+ * @return
+ */
+ Map<String, CommandTracker> parseExpectedLaunchParameters(String launchParameters) {
+ Map<String, CommandTracker> trackers = null;
+ if (launchParameters != null && launchParameters.length() > 0) {
+ String[] componentSpecificParameters = launchParameters.split(Pattern.quote("|"));
+ for (String componentSpecificParameter : componentSpecificParameters) {
+ if (componentSpecificParameter.length() != 0) {
+ String[] parameters = componentSpecificParameter.split(Pattern.quote(":"));
+
+ if (parameters.length > 1 && parameters[0].length() > 0) {
+
+ for (int index = 1; index < parameters.length; index++) {
+ if (parameters[index].equals(NONE_VALUE)) {
+ parameters[index] = DEFAULT_PARAMETER;
+ }
+ }
+
+ if (trackers == null) {
+ trackers = new HashMap<String, CommandTracker>(10);
+ }
+ String componentName = parameters[0];
+ CommandTracker tracker = new CommandTracker(Arrays.copyOfRange(parameters, 1, parameters.length));
+ trackers.put(componentName, tracker);
+ }
+ }
+ }
+ }
+
+ return trackers;
+ }
+
+ class CommandTracker {
+ private final int maxIndex;
+ private final String[] launchCommands;
+ private int currentIndex;
+
+ CommandTracker(String[] launchCommands) {
+ this.currentIndex = 0;
+ this.maxIndex = launchCommands.length - 1;
+ this.launchCommands = launchCommands;
+ }
+
+ String getNextCommand() {
+ String retVal = launchCommands[currentIndex];
+ if (currentIndex != maxIndex) {
+ currentIndex++;
+ }
+
+ return retVal;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java
new file mode 100644
index 0000000..d5ca749
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderService;
+import org.apache.slider.providers.SliderProviderFactory;
+
+public class AgentProviderFactory extends SliderProviderFactory {
+
+ public static final String CLASSNAME =
+ "org.apache.slider.providers.agent.AgentProviderFactory";
+
+ public AgentProviderFactory() {
+ }
+
+ public AgentProviderFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public AbstractClientProvider createClientProvider() {
+ return new AgentClientProvider(getConf());
+ }
+
+ @Override
+ public ProviderService createServerProvider() {
+ return new AgentProviderService();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org