You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/07/21 18:39:14 UTC
[12/52] [abbrv] hadoop git commit: YARN-6255. Refactor
yarn-native-services framework. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c242ac2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 00e2b62..f4ea70b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -20,7 +20,6 @@ package org.apache.slider.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
@@ -55,40 +53,44 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
-import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.SliderApplicationApi;
import org.apache.slider.api.SliderClusterProtocol;
-import org.apache.slider.api.StateValues;
import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.resource.Application;
+import org.apache.slider.api.resource.Component;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.api.types.NodeInformationList;
-import org.apache.slider.api.types.SliderInstanceDescription;
import org.apache.slider.client.ipc.SliderApplicationIpcClient;
import org.apache.slider.client.ipc.SliderClusterOperations;
import org.apache.slider.common.Constants;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.params.AbstractActionArgs;
import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
import org.apache.slider.common.params.ActionAMSuicideArgs;
import org.apache.slider.common.params.ActionClientArgs;
import org.apache.slider.common.params.ActionCreateArgs;
import org.apache.slider.common.params.ActionDependencyArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
import org.apache.slider.common.params.ActionDiagnosticArgs;
import org.apache.slider.common.params.ActionEchoArgs;
import org.apache.slider.common.params.ActionExistsArgs;
@@ -113,20 +115,13 @@ import org.apache.slider.common.params.ActionUpgradeArgs;
import org.apache.slider.common.params.Arguments;
import org.apache.slider.common.params.ClientArgs;
import org.apache.slider.common.params.CommonArgs;
-import org.apache.slider.common.params.LaunchArgsAccessor;
import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.Duration;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.buildutils.InstanceBuilder;
import org.apache.slider.core.buildutils.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
-import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
@@ -137,18 +132,13 @@ import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
import org.apache.slider.core.exceptions.UsageException;
import org.apache.slider.core.exceptions.WaitTimeoutException;
-import org.apache.slider.core.launch.AppMasterLauncher;
import org.apache.slider.core.launch.ClasspathConstructor;
import org.apache.slider.core.launch.CredentialUtils;
import org.apache.slider.core.launch.JavaCommandLineBuilder;
-import org.apache.slider.core.launch.LaunchedApplication;
import org.apache.slider.core.launch.SerializedApplicationReport;
import org.apache.slider.core.main.RunService;
-import org.apache.slider.core.persist.AppDefinitionPersister;
import org.apache.slider.core.persist.ApplicationReportSerDeser;
-import org.apache.slider.core.persist.ConfPersister;
import org.apache.slider.core.persist.JsonSerDeser;
-import org.apache.slider.core.persist.LockAcquireFailedException;
import org.apache.slider.core.registry.SliderRegistryUtils;
import org.apache.slider.core.registry.YarnAppListClient;
import org.apache.slider.core.registry.docstore.ConfigFormat;
@@ -160,19 +150,19 @@ import org.apache.slider.core.registry.docstore.PublishedExportsSet;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.core.zk.ZKIntegration;
-import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
-import org.apache.slider.providers.docker.DockerClientProvider;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.slider.util.ServiceApiUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
@@ -182,7 +172,6 @@ import java.io.ByteArrayOutputStream;
import java.io.Console;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -191,10 +180,7 @@ import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -213,14 +199,11 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
-import static org.apache.slider.api.InternalKeys.*;
-import static org.apache.slider.api.OptionKeys.*;
-import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.api.InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH;
import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
import static org.apache.slider.common.params.SliderActions.*;
import static org.apache.slider.common.tools.SliderUtils.*;
-
/**
* Client service for Slider
*/
@@ -246,6 +229,9 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
public static final String E_PACKAGE_EXISTS = "Package exists";
private static PrintStream clientOutputStream = System.out;
+ private static final JsonSerDeser<Application> jsonSerDeser =
+ new JsonSerDeser<Application>(Application.class,
+ PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
// value should not be changed without updating string find in slider.py
private static final String PASSWORD_PROMPT = "Enter password for";
@@ -362,16 +348,22 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
serviceArgs.getActionAMSuicideArgs());
break;
- case ACTION_BUILD:
- exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
- break;
-
case ACTION_CLIENT:
exitCode = actionClient(serviceArgs.getActionClientArgs());
break;
case ACTION_CREATE:
- exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+ ActionCreateArgs args = serviceArgs.getActionCreateArgs();
+ File file = args.getAppDef();
+ Path filePath = new Path(file.getAbsolutePath());
+ log.info("Loading app definition from: " + filePath);
+ Application application =
+ jsonSerDeser.load(FileSystem.getLocal(getConfig()), filePath);
+ if(args.lifetime > 0) {
+ application.setLifetime(args.lifetime);
+ }
+ application.setName(clusterName);
+ actionCreate(application);
break;
case ACTION_DEPENDENCY:
@@ -379,7 +371,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
break;
case ACTION_DESTROY:
- exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs());
+ actionDestroy(clusterName);
break;
case ACTION_DIAGNOSTICS:
@@ -392,11 +384,11 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
break;
case ACTION_FLEX:
- exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+ actionFlex(clusterName, serviceArgs.getActionFlexArgs());
break;
- case ACTION_FREEZE:
- exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+ case ACTION_STOP:
+ actionStop(clusterName, serviceArgs.getActionFreezeArgs());
break;
case ACTION_HELP:
@@ -456,8 +448,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
break;
- case ACTION_THAW:
- exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+ case ACTION_START:
+ exitCode = actionStart(clusterName, serviceArgs.getActionThawArgs());
break;
case ACTION_TOKENS:
@@ -516,7 +508,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
Exception e = null;
try {
- Configuration config = getConfig();
ZKIntegration client = getZkClient(clusterName, user);
if (client != null) {
if (client.exists(zkPath)) {
@@ -627,76 +618,31 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
* force=true by default.
*/
@Override
- public int actionDestroy(String clustername) throws YarnException,
- IOException {
- ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
- destroyArgs.force = true;
- return actionDestroy(clustername, destroyArgs);
- }
-
- @Override
- public int actionDestroy(String clustername,
- ActionDestroyArgs destroyArgs) throws YarnException, IOException {
- // verify that a live cluster isn't there
- validateClusterName(clustername);
- //no=op, it is now mandatory.
- verifyBindingsDefined();
- verifyNoLiveClusters(clustername, "Destroy");
- boolean forceDestroy = destroyArgs.force;
- log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
-
- // create the directory path
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- // delete the directory;
+ public void actionDestroy(String appName)
+ throws YarnException, IOException {
+ validateClusterName(appName);
+ Path appDir = sliderFileSystem.buildClusterDirPath(appName);
FileSystem fs = sliderFileSystem.getFileSystem();
- boolean exists = fs.exists(clusterDirectory);
- if (exists) {
- log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
- if (!forceDestroy) {
- // fail the command if --force is not explicitly specified
- throw new UsageException("Destroy will permanently delete directories and registries. "
- + "Reissue this command with the --force option if you want to proceed.");
- }
- if (!fs.delete(clusterDirectory, true)) {
- log.warn("Filesystem returned false from delete() operation");
- }
-
- if(!deleteZookeeperNode(clustername)) {
- log.warn("Unable to perform node cleanup in Zookeeper.");
- }
-
- if (fs.exists(clusterDirectory)) {
- log.warn("Failed to delete {}", clusterDirectory);
+ if (fs.exists(appDir)) {
+ if (fs.delete(appDir, true)) {
+ log.info("Successfully deleted application + " + appName);
+ return;
+ } else {
+ String message =
+ "Failed to delete application + " + appName + " at: " + appDir;
+ log.info(message);
+ throw new YarnException(message);
}
-
- } else {
- log.debug("Application Instance {} already destroyed", clustername);
- }
-
- // rm the registry entry —do not let this block the destroy operations
- String registryPath = SliderRegistryUtils.registryPathForInstance(
- clustername);
- try {
- getRegistryOperations().delete(registryPath, true);
- } catch (IOException e) {
- log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
- } catch (SliderException e) {
- log.warn("Error binding to registry {} ", e, e);
}
-
- List<ApplicationReport> instances = findAllLiveInstances(clustername);
- // detect any race leading to cluster creation during the check/destroy process
- // and report a problem.
- if (!instances.isEmpty()) {
- throw new SliderException(EXIT_APPLICATION_IN_USE,
- clustername + ": "
- + E_DESTROY_CREATE_RACE_CONDITION
- + " :" +
- instances.get(0));
+ if (!deleteZookeeperNode(appName)) {
+ String message =
+ "Failed to cleanup cleanup application " + appName + " in zookeeper";
+ log.warn(message);
+ throw new YarnException(message);
}
- log.info("Destroyed cluster {}", clustername);
- return EXIT_SUCCESS;
+ //TODO clean registry
}
+
@Override
public int actionAmSuicide(String clustername,
@@ -715,203 +661,285 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return factory.createClientProvider();
}
- /**
- * Create the cluster -saving the arguments to a specification file first
- * @param clustername cluster name
- * @return the status code
- * @throws YarnException Yarn problems
- * @throws IOException other problems
- * @throws BadCommandArgumentsException bad arguments.
- */
- public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
- YarnException,
- IOException {
-
- actionBuild(clustername, createArgs);
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- clustername, clusterDirectory);
- try {
- checkForCredentials(getConfig(), instanceDefinition.getAppConf(),
- clustername);
- } catch (IOException e) {
- sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
- throw e;
- }
- return startCluster(clustername, createArgs, createArgs.lifetime);
- }
-
- @Override
- public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
- throws YarnException, IOException {
- File template = upgradeArgs.template;
- File resources = upgradeArgs.resources;
- List<String> containers = upgradeArgs.containers;
- List<String> components = upgradeArgs.components;
-
- // For upgrade spec, let's be little more strict with validation. If either
- // --template or --resources is specified, then both needs to be specified.
- // Otherwise the internal app config and resources states of the app will be
- // unwantedly modified and the change will take effect to the running app
- // immediately.
- require(!(template != null && resources == null),
- "Option %s must be specified with option %s",
- Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
-
- require(!(resources != null && template == null),
- "Option %s must be specified with option %s",
- Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
-
- // For upgrade spec, both --template and --resources should be specified
- // and neither of --containers or --components should be used
- if (template != null && resources != null) {
- require(CollectionUtils.isEmpty(containers),
- "Option %s cannot be specified with %s or %s",
- Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
- Arguments.ARG_RESOURCES);
- require(CollectionUtils.isEmpty(components),
- "Option %s cannot be specified with %s or %s",
- Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
- Arguments.ARG_RESOURCES);
-
- // not an error to try to upgrade a stopped cluster, just return success
- // code, appropriate log messages have already been dumped
- if (!isAppInRunningState(clustername)) {
- return EXIT_SUCCESS;
- }
- // Now initiate the upgrade spec flow
- buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
- SliderClusterOperations clusterOperations = createClusterOperations(clustername);
- clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
- return EXIT_SUCCESS;
- }
+ public ApplicationId actionCreate(Application application)
+ throws IOException, YarnException {
+ ServiceApiUtil.validateApplicationPostPayload(application);
+ String appName = application.getName();
+ validateClusterName(appName);
+ verifyNoLiveApp(appName, "Create");
+ Path appDir = checkAppNotExistOnHdfs(application);
- // Since neither --template or --resources were specified, it is upgrade
- // containers flow. Here any one or both of --containers and --components
- // can be specified. If a container is specified with --containers option
- // and also belongs to a component type specified with --components, it will
- // be upgraded only once.
- return actionUpgradeContainers(clustername, upgradeArgs);
+ ApplicationId appId = submitApp(application);
+ application.setId(appId.toString());
+ // write app definition on to hdfs
+ persistApp(appDir, application);
+ return appId;
+ //TODO deal with registry
}
- private int actionUpgradeContainers(String clustername,
- ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
- verifyBindingsDefined();
- validateClusterName(clustername);
- int waittime = upgradeArgs.getWaittime(); // ignored for now
- String text = "Upgrade containers";
- log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
- text, waittime);
-
- // not an error to try to upgrade a stopped cluster, just return success
- // code, appropriate log messages have already been dumped
- if (!isAppInRunningState(clustername)) {
- return EXIT_SUCCESS;
- }
-
- // Create sets of containers and components to get rid of duplicates and
- // for quick lookup during checks below
- Set<String> containers = new HashSet<>();
- if (upgradeArgs.containers != null) {
- containers.addAll(new ArrayList<>(upgradeArgs.containers));
- }
- Set<String> components = new HashSet<>();
- if (upgradeArgs.components != null) {
- components.addAll(new ArrayList<>(upgradeArgs.components));
- }
-
- // check validity of component names and running containers here
- List<ContainerInformation> liveContainers = getContainers(clustername);
- Set<String> validContainers = new HashSet<>();
- Set<String> validComponents = new HashSet<>();
- for (ContainerInformation liveContainer : liveContainers) {
- boolean allContainersAndComponentsAccountedFor = true;
- if (CollectionUtils.isNotEmpty(containers)) {
- if (containers.contains(liveContainer.containerId)) {
- containers.remove(liveContainer.containerId);
- validContainers.add(liveContainer.containerId);
- }
- allContainersAndComponentsAccountedFor = false;
- }
- if (CollectionUtils.isNotEmpty(components)) {
- if (components.contains(liveContainer.component)) {
- components.remove(liveContainer.component);
- validComponents.add(liveContainer.component);
- }
- allContainersAndComponentsAccountedFor = false;
- }
- if (allContainersAndComponentsAccountedFor) {
- break;
+ private ApplicationId submitApp(Application app)
+ throws IOException, YarnException {
+ String appName = app.getName();
+ Configuration conf = getConfig();
+ Path appRootDir = sliderFileSystem.buildClusterDirPath(app.getName());
+ deployedClusterName = appName;
+
+ YarnClientApplication yarnApp = yarnClient.createApplication();
+ ApplicationSubmissionContext submissionContext =
+ yarnApp.getApplicationSubmissionContext();
+ applicationId = submissionContext.getApplicationId();
+ submissionContext.setKeepContainersAcrossApplicationAttempts(true);
+ if (app.getLifetime() > 0) {
+ Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
+ appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime());
+ submissionContext.setApplicationTimeouts(appTimeout);
+ }
+ submissionContext.setMaxAppAttempts(conf.getInt(KEY_AM_RESTART_LIMIT, 2));
+
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ // copy local slideram-log4j.properties to hdfs and add to localResources
+ boolean hasSliderAMLog4j =
+ addAMLog4jResource(appName, conf, localResources);
+ // copy jars to hdfs and add to localResources
+ Path tempPath = addJarResource(appName, localResources);
+ // add keytab if in secure env
+ addKeytabResourceIfSecure(sliderFileSystem, localResources, conf, appName);
+ printLocalResources(localResources);
+
+ //TODO SliderAMClientProvider#copyEnvVars
+ //TODO localResource putEnv
+
+ Map<String, String> env = addAMEnv(conf, tempPath);
+
+ // create AM CLI
+ String cmdStr =
+ buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j);
+
+ //TODO set log aggregation context
+ //TODO set retry window
+ submissionContext.setResource(Resource.newInstance(
+ conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1));
+ submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE));
+ submissionContext.setApplicationName(appName);
+ submissionContext.setApplicationType(SliderKeys.APP_TYPE);
+ Set<String> appTags =
+ AbstractClientProvider.createApplicationTags(appName, null, null);
+ if (!appTags.isEmpty()) {
+ submissionContext.setApplicationTags(appTags);
+ }
+ ContainerLaunchContext amLaunchContext =
+ Records.newRecord(ContainerLaunchContext.class);
+ amLaunchContext.setCommands(Collections.singletonList(cmdStr));
+ amLaunchContext.setEnvironment(env);
+ amLaunchContext.setLocalResources(localResources);
+ addCredentialsIfSecure(conf, amLaunchContext);
+ submissionContext.setAMContainerSpec(amLaunchContext);
+ yarnClient.submitApplication(submissionContext);
+ return submissionContext.getApplicationId();
+ }
+
+ private void printLocalResources(Map<String, LocalResource> map) {
+ log.info("Added LocalResource for localization: ");
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<String, LocalResource> entry : map.entrySet()) {
+ builder.append(entry.getKey()).append(" -> ")
+ .append(entry.getValue().getResource().getFile())
+ .append(System.lineSeparator());
+ }
+ log.info(builder.toString());
+ }
+
+ private void addCredentialsIfSecure(Configuration conf,
+ ContainerLaunchContext amLaunchContext) throws IOException {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // pick up oozie credentials
+ Credentials credentials =
+ CredentialUtils.loadTokensFromEnvironment(System.getenv(), conf);
+ if (credentials == null) {
+ // nothing from oozie, so build up directly
+ credentials = new Credentials(
+ UserGroupInformation.getCurrentUser().getCredentials());
+ CredentialUtils.addRMRenewableFSDelegationTokens(conf,
+ sliderFileSystem.getFileSystem(), credentials);
+ } else {
+ log.info("Using externally supplied credentials to launch AM");
}
+ amLaunchContext.setTokens(CredentialUtils.marshallCredentials(credentials));
}
+ }
- // If any item remains in containers or components then they are invalid.
- // Log warning for them and proceed.
- if (CollectionUtils.isNotEmpty(containers)) {
- log.warn("Invalid set of containers provided {}", containers);
- }
- if (CollectionUtils.isNotEmpty(components)) {
- log.warn("Invalid set of components provided {}", components);
+ private String buildCommandLine(String appName, Configuration conf,
+ Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException {
+ JavaCommandLineBuilder CLI = new JavaCommandLineBuilder();
+ CLI.forceIPv4().headless();
+ //TODO CLI.setJVMHeap
+ //TODO CLI.addJVMOPTS
+ if (hasSliderAMLog4j) {
+ CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
+ CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
}
-
- // If not a single valid container or component is specified do not proceed
- if (CollectionUtils.isEmpty(validContainers)
- && CollectionUtils.isEmpty(validComponents)) {
- log.error("Not a single valid container or component specified. Nothing to do.");
- return EXIT_NOT_FOUND;
+ CLI.add(SliderAppMaster.SERVICE_CLASSNAME);
+ CLI.add(ACTION_CREATE, appName);
+ //TODO debugAM CLI.add(Arguments.ARG_DEBUG)
+ CLI.add(Arguments.ARG_CLUSTER_URI, appRootDir.toUri());
+// InetSocketAddress rmSchedulerAddress = getRmSchedulerAddress(conf);
+// String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
+// CLI.add(Arguments.ARG_RM_ADDR, rmAddr);
+ // pass the registry binding
+ CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+ RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+ CLI.addMandatoryConfOption(conf, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+ if(isHadoopClusterSecure(conf)) {
+ //TODO Is this required ??
+ // if the cluster is secure, make sure that
+ // the relevant security settings go over
+ CLI.addConfOption(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
}
+// // copy over any/all YARN RM client values, in case the server-side XML conf file
+// // has the 0.0.0.0 address
+// CLI.addConfOptions(conf, YarnConfiguration.RM_ADDRESS,
+// YarnConfiguration.RM_CLUSTER_ID, YarnConfiguration.RM_HOSTNAME,
+// YarnConfiguration.RM_PRINCIPAL);
- SliderClusterProtocol appMaster = connect(findInstance(clustername));
- Messages.UpgradeContainersRequestProto r =
- Messages.UpgradeContainersRequestProto
- .newBuilder()
- .setMessage(text)
- .addAllContainer(validContainers)
- .addAllComponent(validComponents)
- .build();
- appMaster.upgradeContainers(r);
- log.info("Cluster upgrade issued for -");
- if (CollectionUtils.isNotEmpty(validContainers)) {
- log.info(" Containers (total {}): {}", validContainers.size(),
- validContainers);
- }
- if (CollectionUtils.isNotEmpty(validComponents)) {
- log.info(" Components (total {}): {}", validComponents.size(),
- validComponents);
- }
+ // write out the path output
+ CLI.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+ String cmdStr = CLI.build();
+ log.info("Completed setting up app master command: {}", cmdStr);
+ return cmdStr;
+ }
- return EXIT_SUCCESS;
+ private Map<String, String> addAMEnv(Configuration conf, Path tempPath)
+ throws IOException {
+ Map<String, String> env = new HashMap<String, String>();
+ ClasspathConstructor classpath =
+ buildClasspath(SliderKeys.SUBMITTED_CONF_DIR, "lib",
+ sliderFileSystem, getUsingMiniMRCluster());
+ env.put("CLASSPATH", classpath.buildClasspath());
+ env.put("LANG", "en_US.UTF-8");
+ env.put("LC_ALL", "en_US.UTF-8");
+ env.put("LANGUAGE", "en_US.UTF-8");
+ String jaas = System.getenv(HADOOP_JAAS_DEBUG);
+ if (jaas != null) {
+ env.put(HADOOP_JAAS_DEBUG, jaas);
+ }
+ env.putAll(getAmLaunchEnv(conf));
+ log.info("AM env: \n{}", stringifyMap(env));
+ return env;
+ }
+
+ private Path addJarResource(String appName,
+ Map<String, LocalResource> localResources)
+ throws IOException, SliderException {
+ Path libPath = sliderFileSystem.buildClusterDirPath(appName);
+ ProviderUtils
+ .addProviderJar(localResources, SliderAppMaster.class, SLIDER_JAR,
+ sliderFileSystem, libPath, "lib", false);
+ Path dependencyLibTarGzip = sliderFileSystem.getDependencyTarGzip();
+ if (sliderFileSystem.isFile(dependencyLibTarGzip)) {
+ log.info("Loading lib tar from " + sliderFileSystem.getFileSystem()
+ .getScheme() + ": " + dependencyLibTarGzip);
+ SliderUtils.putAmTarGzipAndUpdate(localResources, sliderFileSystem);
+ } else {
+ String[] libs = SliderUtils.getLibDirs();
+ log.info("Loading dependencies from local file system: " + Arrays
+ .toString(libs));
+ for (String libDirProp : libs) {
+ ProviderUtils
+ .addAllDependencyJars(localResources, sliderFileSystem, libPath,
+ "lib", libDirProp);
+ }
+ }
+ return libPath;
+ }
+
+ private boolean addAMLog4jResource(String appName, Configuration conf,
+ Map<String, LocalResource> localResources)
+ throws IOException, BadClusterStateException {
+ boolean hasSliderAMLog4j = false;
+ String hadoopConfDir =
+ System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
+ if (hadoopConfDir != null) {
+ File localFile =
+ new File(hadoopConfDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+ if (localFile.exists()) {
+ Path localFilePath = createLocalPath(localFile);
+ Path appDirPath = sliderFileSystem.buildClusterDirPath(appName);
+ Path remoteConfPath =
+ new Path(appDirPath, SliderKeys.SUBMITTED_CONF_DIR);
+ Path remoteFilePath =
+ new Path(remoteConfPath, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+ copy(conf, localFilePath, remoteFilePath);
+ LocalResource localResource = sliderFileSystem
+ .createAmResource(remoteConfPath, LocalResourceType.FILE);
+ localResources.put(localFilePath.getName(), localResource);
+ hasSliderAMLog4j = true;
+ }
+ }
+ return hasSliderAMLog4j;
+ }
+
+ private Path checkAppNotExistOnHdfs(Application application)
+ throws IOException, SliderException {
+ Path appDir = sliderFileSystem.buildClusterDirPath(application.getName());
+ sliderFileSystem.verifyDirectoryNonexistent(
+ new Path(appDir, application.getName() + ".json"));
+ return appDir;
}
- // returns true if and only if app is in RUNNING state
- private boolean isAppInRunningState(String clustername) throws YarnException,
- IOException {
- // is this actually a known cluster?
- sliderFileSystem.locateInstanceDefinition(clustername);
- ApplicationReport app = findInstance(clustername);
- if (app == null) {
- // exit early
- log.info("Cluster {} not running", clustername);
- return false;
- }
- log.debug("App to upgrade was found: {}:\n{}", clustername,
- new OnDemandReportStringifier(app));
- if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
- log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.",
- clustername, app.getYarnApplicationState(), ACTION_UPDATE);
- return false;
+ private void persistApp(Path appDir, Application application)
+ throws IOException, SliderException {
+ FsPermission appDirPermission = new FsPermission("777");
+ sliderFileSystem.createWithPermissions(appDir, appDirPermission);
+ Path appJson = new Path(appDir, application.getName() + ".json");
+ jsonSerDeser
+ .save(sliderFileSystem.getFileSystem(), appJson, application, true);
+ log.info(
+ "Persisted application " + application.getName() + " at " + appJson);
+ }
+
+ private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
+ Map<String, LocalResource> localResource, Configuration conf,
+ String appName) throws IOException, BadConfigException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
}
-
- // IPC request to upgrade containers is possible if the app is running.
- if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
- .ordinal()) {
- log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
- + "to be RUNNING.", clustername, app.getYarnApplicationState());
- return false;
+ String keytabPreInstalledOnHost =
+ conf.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+ if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
+ String amKeytabName =
+ conf.get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+ String keytabDir = conf.get(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
+ Path keytabPath =
+ fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName);
+ if (fileSystem.getFileSystem().exists(keytabPath)) {
+ LocalResource keytabRes =
+ fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
+ localResource
+ .put(SliderKeys.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
+ log.info("Adding AM keytab on hdfs: " + keytabPath);
+ } else {
+ log.warn("No keytab file was found at {}.", keytabPath);
+ if (conf.getBoolean(KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
+ throw new BadConfigException("No keytab file was found at %s.",
+ keytabPath);
+ } else {
+ log.warn("The AM will be "
+ + "started without a kerberos authenticated identity. "
+ + "The application is therefore not guaranteed to remain "
+ + "operational beyond 24 hours.");
+ }
+ }
}
+ }
- return true;
+ @Override
+ public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
+ throws YarnException, IOException {
+ //TODO
+ return 0;
}
protected static void checkForCredentials(Configuration conf,
@@ -952,15 +980,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
}
- private static char[] readOnePassword(String alias) throws IOException {
- Console console = System.console();
- if (console == null) {
- throw new IOException("Unable to input password for " + alias +
- " because System.console() is null");
- }
- return readPassword(alias, console);
- }
-
private static char[] readPassword(String alias, Console console)
throws IOException {
char[] cred = null;
@@ -987,16 +1006,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
@Override
- public int actionBuild(String clustername,
- AbstractClusterBuildingActionArgs buildInfo) throws
- YarnException,
- IOException {
-
- buildInstanceDefinition(clustername, buildInfo, false, false);
- return EXIT_SUCCESS;
- }
-
- @Override
public int actionKeytab(ActionKeytabArgs keytabInfo)
throws YarnException, IOException {
if (keytabInfo.install) {
@@ -1527,12 +1536,12 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
if (buildInfo.lifetime > 0) {
updateLifetime(clustername, buildInfo.lifetime);
} else {
- buildInstanceDefinition(clustername, buildInfo, true, true);
+ //TODO upgrade
}
return EXIT_SUCCESS;
}
- public void updateLifetime(String appName, long lifetime)
+ public String updateLifetime(String appName, long lifetime)
throws YarnException, IOException {
EnumSet<YarnApplicationState> appStates = EnumSet.range(
YarnApplicationState.NEW, YarnApplicationState.RUNNING);
@@ -1553,396 +1562,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
log.info("Successfully updated lifetime for an application: appName = "
+ appName + ", appId = " + appId
+ ". New expiry time in ISO8601 format is " + newTimeout);
- }
-
- /**
- * Build up the AggregateConfiguration for an application instance then
- * persists it
- * @param clustername name of the cluster
- * @param buildInfo the arguments needed to build the cluster
- * @param overwrite true if existing cluster directory can be overwritten
- * @param liveClusterAllowed true if live cluster can be modified
- * @throws YarnException
- * @throws IOException
- */
-
- public void buildInstanceDefinition(String clustername,
- AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
- boolean liveClusterAllowed) throws YarnException, IOException {
- buildInstanceDefinition(clustername, buildInfo, overwrite,
- liveClusterAllowed, false);
- }
-
- public void buildInstanceDefinition(String clustername,
- AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
- boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
- IOException {
- // verify that a live cluster isn't there
- validateClusterName(clustername);
- verifyBindingsDefined();
- if (!liveClusterAllowed) {
- verifyNoLiveClusters(clustername, "Create");
- }
-
- Configuration conf = getConfig();
- String registryQuorum = lookupZKQuorum();
-
- Path appconfdir = buildInfo.getConfdir();
- // Provider
- String providerName = buildInfo.getProvider();
- requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
- log.debug("Provider is {}", providerName);
- SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
- AbstractClientProvider provider =
- createClientProvider(providerName);
- InstanceBuilder builder =
- new InstanceBuilder(sliderFileSystem,
- getConfig(),
- clustername);
-
- AggregateConf instanceDefinition = new AggregateConf();
- ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
- ConfTreeOperations resources = instanceDefinition.getResourceOperations();
- ConfTreeOperations internal = instanceDefinition.getInternalOperations();
- //initial definition is set by the providers
- sliderAM.prepareInstanceConfiguration(instanceDefinition);
- provider.prepareInstanceConfiguration(instanceDefinition);
-
- //load in any specified on the command line
- if (buildInfo.resources != null) {
- try {
- resources.mergeFile(buildInfo.resources,
- new ResourcesInputPropertiesValidator());
-
- } catch (IOException e) {
- throw new BadConfigException(e,
- "incorrect argument to %s: \"%s\" : %s ",
- Arguments.ARG_RESOURCES,
- buildInfo.resources,
- e.toString());
- }
- }
- if (buildInfo.template != null) {
- try {
- appConf.mergeFile(buildInfo.template,
- new TemplateInputPropertiesValidator());
- } catch (IOException e) {
- throw new BadConfigException(e,
- "incorrect argument to %s: \"%s\" : %s ",
- Arguments.ARG_TEMPLATE,
- buildInfo.template,
- e.toString());
- }
- }
-
- if (isUpgradeFlow) {
- ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
- if (!upgradeInfo.force) {
- validateClientAndClusterResource(clustername, resources);
- }
- }
-
- //get the command line options
- ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
- ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
-
- appConf.merge(cmdLineAppOptions);
-
- AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
- appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
-
- // put the role counts into the resources file
- Map<String, String> argsRoleMap = buildInfo.getComponentMap();
- for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
- String count = roleEntry.getValue();
- String key = roleEntry.getKey();
- log.info("{} => {}", key, count);
- resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
- }
-
- //all CLI role options
- Map<String, Map<String, String>> appOptionMap =
- buildInfo.getCompOptionMap();
- appConf.mergeComponents(appOptionMap);
-
- //internal picks up core. values only
- internal.propagateGlobalKeys(appConf, "slider.");
- internal.propagateGlobalKeys(appConf, "internal.");
-
- //copy over role. and yarn. values ONLY to the resources
- if (PROPAGATE_RESOURCE_OPTION) {
- resources.propagateGlobalKeys(appConf, "component.");
- resources.propagateGlobalKeys(appConf, "role.");
- resources.propagateGlobalKeys(appConf, "yarn.");
- resources.mergeComponentsPrefix(appOptionMap, "component.", true);
- resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
- resources.mergeComponentsPrefix(appOptionMap, "role.", true);
- }
-
- // resource component args
- appConf.merge(cmdLineResourceOptions);
- resources.merge(cmdLineResourceOptions);
- resources.mergeComponents(buildInfo.getResourceCompOptionMap());
-
- builder.init(providerName, instanceDefinition);
- builder.resolve();
- builder.propagateFilename();
- builder.propagatePrincipals();
- builder.setImageDetailsIfAvailable(buildInfo.getImage(),
- buildInfo.getAppHomeDir());
- builder.setQueue(buildInfo.queue);
-
- String quorum = buildInfo.getZKhosts();
- if (isUnset(quorum)) {
- quorum = registryQuorum;
- }
- if (isUnset(quorum)) {
- throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
- }
- ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
- getUsername(),
- clustername,
- registryQuorum,
- quorum);
- String zookeeperRoot = buildInfo.getAppZKPath();
-
- if (isSet(zookeeperRoot)) {
- zkPaths.setAppPath(zookeeperRoot);
- } else {
- String createDefaultZkNode = appConf.getGlobalOptions()
- .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
- if (createDefaultZkNode.equals("true")) {
- String defaultZKPath = createZookeeperNode(clustername, false);
- log.debug("ZK node created for application instance: {}", defaultZKPath);
- if (defaultZKPath != null) {
- zkPaths.setAppPath(defaultZKPath);
- }
- } else {
- // create AppPath if default is being used
- String defaultZKPath = createZookeeperNode(clustername, true);
- log.debug("ZK node assigned to application instance: {}", defaultZKPath);
- zkPaths.setAppPath(defaultZKPath);
- }
- }
-
- builder.addZKBinding(zkPaths);
-
- //then propagate any package URI
- if (buildInfo.packageURI != null) {
- appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
- }
-
- propagatePythonExecutable(conf, instanceDefinition);
-
- // make any substitutions needed at this stage
- replaceTokens(appConf.getConfTree(), getUsername(), clustername);
-
- // TODO: Refactor the validation code and persistence code
- try {
- persistInstanceDefinition(overwrite, appconfdir, builder);
- appDefinitionPersister.persistPackages();
-
- } catch (LockAcquireFailedException e) {
- log.warn("Failed to get a Lock on {} : {}", builder, e, e);
- throw new BadClusterStateException("Failed to save " + clustername
- + ": " + e);
- }
-
- // providers to validate what there is
- // TODO: Validation should be done before persistence
- AggregateConf instanceDescription = builder.getInstanceDescription();
- validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
- validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
- }
-
- private void validateClientAndClusterResource(String clustername,
- ConfTreeOperations clientResources) throws BadClusterStateException,
- SliderException, IOException {
- log.info("Validating upgrade resource definition with current cluster "
- + "state (components and instance count)");
- Map<String, Integer> clientComponentInstances = new HashMap<>();
- for (String componentName : clientResources.getComponentNames()) {
- if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
- clientComponentInstances.put(componentName, clientResources
- .getComponentOptInt(componentName,
- COMPONENT_INSTANCES, -1));
- }
- }
-
- AggregateConf clusterConf = null;
- try {
- clusterConf = loadPersistedClusterDescription(clustername);
- } catch (LockAcquireFailedException e) {
- log.warn("Failed to get a Lock on cluster resource : {}", e, e);
- throw new BadClusterStateException(
- "Failed to load client resource definition " + clustername + ": " + e, e);
- }
- Map<String, Integer> clusterComponentInstances = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> component : clusterConf
- .getResources().components.entrySet()) {
- if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
- clusterComponentInstances.put(
- component.getKey(),
- Integer.decode(component.getValue().get(
- COMPONENT_INSTANCES)));
- }
- }
-
- // client and cluster should be an exact match
- Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
- .entrySet().iterator();
- while (clientComponentInstanceIt.hasNext()) {
- Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next();
- if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
- // compare instance count now and remove from both maps if they match
- if (clusterComponentInstances
- .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry
- .getValue().intValue()) {
- clusterComponentInstances.remove(clientComponentInstanceEntry
- .getKey());
- clientComponentInstanceIt.remove();
- }
- }
- }
-
- if (!clientComponentInstances.isEmpty()
- || !clusterComponentInstances.isEmpty()) {
- log.error("Mismatch found in upgrade resource definition and cluster "
- + "resource state");
- if (!clientComponentInstances.isEmpty()) {
- log.info("The upgrade resource definitions that do not match are:");
- for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
- .entrySet()) {
- log.info(" Component Name: {}, Instance count: {}",
- clientComponentInstanceEntry.getKey(),
- clientComponentInstanceEntry.getValue());
- }
- }
- if (!clusterComponentInstances.isEmpty()) {
- log.info("The cluster resources that do not match are:");
- for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
- .entrySet()) {
- log.info(" Component Name: {}, Instance count: {}",
- clusterComponentInstanceEntry.getKey(),
- clusterComponentInstanceEntry.getValue());
- }
- }
- throw new BadConfigException("Resource definition provided for "
- + "upgrade does not match with that of the currently running "
- + "cluster.\nIf you are aware of what you are doing, rerun the "
- + "command with " + Arguments.ARG_FORCE + " option.");
- }
- }
-
- protected void persistInstanceDefinition(boolean overwrite,
- Path appconfdir,
- InstanceBuilder builder)
- throws IOException, SliderException, LockAcquireFailedException {
- builder.persist(appconfdir, overwrite);
- }
-
- @VisibleForTesting
- public static void replaceTokens(ConfTree conf,
- String userName, String clusterName) throws IOException {
- Map<String,String> newglobal = new HashMap<>();
- for (Entry<String,String> entry : conf.global.entrySet()) {
- newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
- userName, clusterName));
- }
- conf.global.putAll(newglobal);
-
- for (String component : conf.components.keySet()) {
- Map<String,String> newComponent = new HashMap<>();
- for (Entry<String,String> entry : conf.components.get(component).entrySet()) {
- newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
- userName, clusterName));
- }
- conf.components.get(component).putAll(newComponent);
- }
-
- Map<String,List<String>> newcred = new HashMap<>();
- for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
- List<String> resultList = new ArrayList<>();
- for (String v : entry.getValue()) {
- resultList.add(replaceTokens(v, userName, clusterName));
- }
- newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
- resultList);
- }
- conf.credentials.clear();
- conf.credentials.putAll(newcred);
- }
-
- private static String replaceTokens(String s, String userName,
- String clusterName) throws IOException {
- return s.replaceAll(Pattern.quote("${USER}"), userName)
- .replaceAll(Pattern.quote("${USER_NAME}"), userName);
- }
-
- public FsPermission getClusterDirectoryPermissions(Configuration conf) {
- String clusterDirPermsOct =
- conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
- return new FsPermission(clusterDirPermsOct);
- }
-
- /**
- * Verify that the Resource Manager is configured (on a non-HA cluster).
- * with a useful error message
- * @throws BadCommandArgumentsException the exception raised on an invalid config
- */
- public void verifyBindingsDefined() throws BadCommandArgumentsException {
- InetSocketAddress rmAddr = getRmAddress(getConfig());
- if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
- && !isAddressDefined(rmAddr)) {
- throw new BadCommandArgumentsException(
- E_NO_RESOURCE_MANAGER
- + " in the argument "
- + Arguments.ARG_MANAGER
- + " or the configuration property "
- + YarnConfiguration.RM_ADDRESS
- + " value :" + rmAddr);
- }
- }
-
- /**
- * Load and start a cluster specification.
- * This assumes that all validation of args and cluster state
- * have already taken place
- *
- * @param clustername name of the cluster.
- * @param launchArgs launch arguments
- * @param lifetime
- * @return the exit code
- * @throws YarnException
- * @throws IOException
- */
- protected int startCluster(String clustername, LaunchArgsAccessor launchArgs,
- long lifetime) throws YarnException, IOException {
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- clustername,
- clusterDirectory);
-
- LaunchedApplication launchedApplication =
- launchApplication(clustername, clusterDirectory, instanceDefinition,
- serviceArgs.isDebug(), lifetime);
-
- if (launchArgs.getOutputFile() != null) {
- // output file has been requested. Get the app report and serialize it
- ApplicationReport report =
- launchedApplication.getApplicationReport();
- SerializedApplicationReport sar = new SerializedApplicationReport(report);
- sar.submitTime = System.currentTimeMillis();
- ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
- serDeser.save(sar, launchArgs.getOutputFile());
- }
- int waittime = launchArgs.getWaittime();
- if (waittime > 0) {
- return waitForAppRunning(launchedApplication, waittime, waittime);
- } else {
- // no waiting
- return EXIT_SUCCESS;
- }
+ return newTimeout;
}
/**
@@ -1968,415 +1588,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
}
- /**
- * Load the instance definition.
- * @param name cluster name
- * @param resolved flag to indicate the cluster should be resolved
- * @return the loaded configuration
- * @throws IOException IO problems
- * @throws SliderException slider explicit issues
- * @throws UnknownApplicationInstanceException if the file is not found
- */
- public AggregateConf loadInstanceDefinition(String name,
- boolean resolved) throws
- IOException,
- SliderException {
-
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- name,
- clusterDirectory);
- if (resolved) {
- instanceDefinition.resolve();
- }
- return instanceDefinition;
-
- }
-
- protected AppMasterLauncher setupAppMasterLauncher(String clustername,
- Path clusterDirectory, AggregateConf instanceDefinition, boolean debugAM,
- long lifetime)
- throws YarnException, IOException{
- deployedClusterName = clustername;
- validateClusterName(clustername);
- verifyNoLiveClusters(clustername, "Launch");
- Configuration config = getConfig();
- lookupZKQuorum();
- boolean clusterSecure = isHadoopClusterSecure(config);
- //create the Slider AM provider -this helps set up the AM
- SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
-
- instanceDefinition.resolve();
- launchedInstanceDefinition = instanceDefinition;
-
- ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations();
- MapOperations internalOptions = internalOperations.getGlobalOptions();
- ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations();
- ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations();
- Path generatedConfDirPath =
- createPathThatMustExist(internalOptions.getMandatoryOption(
- INTERNAL_GENERATED_CONF_PATH));
- Path snapshotConfPath =
- createPathThatMustExist(internalOptions.getMandatoryOption(
- INTERNAL_SNAPSHOT_CONF_PATH));
-
-
- // cluster Provider
- AbstractClientProvider provider = createClientProvider(
- internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
- if (log.isDebugEnabled()) {
- log.debug(instanceDefinition.toString());
- }
- MapOperations sliderAMResourceComponent =
- resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
- MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
-
- // add the tags if available
- Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
- appOperations, clustername);
-
- Credentials credentials = null;
- if (clusterSecure) {
- // pick up oozie credentials
- credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
- config);
- if (credentials == null) {
- // nothing from oozie, so build up directly
- credentials = new Credentials(
- UserGroupInformation.getCurrentUser().getCredentials());
- CredentialUtils.addRMRenewableFSDelegationTokens(config,
- sliderFileSystem.getFileSystem(),
- credentials);
- CredentialUtils.addRMDelegationToken(yarnClient, credentials);
-
- } else {
- log.info("Using externally supplied credentials to launch AM");
- }
- }
-
- AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
- SliderKeys.APP_TYPE,
- config,
- sliderFileSystem,
- yarnClient,
- clusterSecure,
- sliderAMResourceComponent,
- resourceGlobalOptions,
- applicationTags,
- credentials);
-
- ApplicationId appId = amLauncher.getApplicationId();
- // set the application name;
- amLauncher.setKeepContainersOverRestarts(true);
- // set lifetime in submission context;
- Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
- if (lifetime > 0) {
- appTimeout.put(ApplicationTimeoutType.LIFETIME, lifetime);
- }
- amLauncher.submissionContext.setApplicationTimeouts(appTimeout);
- int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
- amLauncher.setMaxAppAttempts(maxAppAttempts);
-
- sliderFileSystem.purgeAppInstanceTempFiles(clustername);
- Path tempPath = sliderFileSystem.createAppInstanceTempPath(
- clustername,
- appId.toString() + "/am");
- String libdir = "lib";
- Path libPath = new Path(tempPath, libdir);
- sliderFileSystem.getFileSystem().mkdirs(libPath);
- log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath);
-
- // set local resources for the application master
- // local files or archives as needed
- // In this scenario, the jar file for the application master is part of the local resources
- Map<String, LocalResource> localResources = amLauncher.getLocalResources();
-
- // look for the configuration directory named on the command line
- boolean hasServerLog4jProperties = false;
- Path remoteConfPath = null;
- String relativeConfDir = null;
- String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
- if (isUnset(confdirProp)) {
- log.debug("No local configuration directory provided as system property");
- } else {
- File confDir = new File(confdirProp);
- if (!confDir.exists()) {
- throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
- confDir);
- }
- Path localConfDirPath = createLocalPath(confDir);
- remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
- log.debug("Slider configuration directory is {}; remote to be {}",
- localConfDirPath, remoteConfPath);
- copyDirectory(config, localConfDirPath, remoteConfPath, null);
-
- File log4jserver =
- new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
- hasServerLog4jProperties = log4jserver.isFile();
- }
- if (!hasServerLog4jProperties) {
- // check for log4j properties in hadoop conf dir
- String hadoopConfDir = System.getenv(ApplicationConstants.Environment
- .HADOOP_CONF_DIR.name());
- if (hadoopConfDir != null) {
- File localFile = new File(hadoopConfDir, SliderKeys
- .LOG4J_SERVER_PROP_FILENAME);
- if (localFile.exists()) {
- Path localFilePath = createLocalPath(localFile);
- remoteConfPath = new Path(clusterDirectory,
- SliderKeys.SUBMITTED_CONF_DIR);
- Path remoteFilePath = new Path(remoteConfPath, SliderKeys
- .LOG4J_SERVER_PROP_FILENAME);
- copy(config, localFilePath, remoteFilePath);
- hasServerLog4jProperties = true;
- }
- }
- }
- // the assumption here is that minimr cluster => this is a test run
- // and the classpath can look after itself
-
- boolean usingMiniMRCluster = getUsingMiniMRCluster();
- if (!usingMiniMRCluster) {
-
- log.debug("Destination is not a MiniYARNCluster -copying full classpath");
-
- // insert conf dir first
- if (remoteConfPath != null) {
- relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
- Map<String, LocalResource> submittedConfDir =
- sliderFileSystem.submitDirectory(remoteConfPath,
- relativeConfDir);
- mergeMaps(localResources, submittedConfDir);
- }
- }
- // build up the configuration
- // IMPORTANT: it is only after this call that site configurations
- // will be valid.
-
- propagatePrincipals(config, instanceDefinition);
- // validate security data
-
-/*
- // turned off until tested
- SecurityConfiguration securityConfiguration =
- new SecurityConfiguration(config,
- instanceDefinition, clustername);
-
-*/
- Configuration clientConfExtras = new Configuration(false);
- // then build up the generated path.
- FsPermission clusterPerms = getClusterDirectoryPermissions(config);
- copyDirectory(config, snapshotConfPath, generatedConfDirPath,
- clusterPerms);
-
-
- // standard AM resources
- sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
- config,
- amLauncher,
- instanceDefinition,
- snapshotConfPath,
- generatedConfDirPath,
- clientConfExtras,
- libdir,
- tempPath,
- usingMiniMRCluster);
- //add provider-specific resources
- provider.prepareAMAndConfigForLaunch(sliderFileSystem,
- config,
- amLauncher,
- instanceDefinition,
- snapshotConfPath,
- generatedConfDirPath,
- clientConfExtras,
- libdir,
- tempPath,
- usingMiniMRCluster);
-
- // now that the site config is fully generated, the provider gets
- // to do a quick review of them.
- log.debug("Preflight validation of cluster configuration");
-
-
- sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
- clustername,
- config,
- instanceDefinition,
- clusterDirectory,
- generatedConfDirPath,
- clusterSecure
- );
-
- provider.preflightValidateClusterConfiguration(sliderFileSystem,
- clustername,
- config,
- instanceDefinition,
- clusterDirectory,
- generatedConfDirPath,
- clusterSecure
- );
-
-
- if (!(provider instanceof DockerClientProvider)) {
- Path imagePath =
- extractImagePath(sliderFileSystem, internalOptions);
- if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
- log.debug("Registered image path {}", imagePath);
- }
- }
-
- // build the environment
- amLauncher.putEnv(
- buildEnvMap(sliderAMResourceComponent));
- ClasspathConstructor classpath = buildClasspath(relativeConfDir,
- libdir,
- getConfig(),
- sliderFileSystem,
- usingMiniMRCluster);
- amLauncher.setClasspath(classpath);
- //add english env
- amLauncher.setEnv("LANG", "en_US.UTF-8");
- amLauncher.setEnv("LC_ALL", "en_US.UTF-8");
- amLauncher.setEnv("LANGUAGE", "en_US.UTF-8");
- amLauncher.maybeSetEnv(HADOOP_JAAS_DEBUG,
- System.getenv(HADOOP_JAAS_DEBUG));
- amLauncher.putEnv(getAmLaunchEnv(config));
-
- for (Map.Entry<String, String> envs : getSystemEnv().entrySet()) {
- log.debug("System env {}={}", envs.getKey(), envs.getValue());
- }
- if (log.isDebugEnabled()) {
- log.debug("AM classpath={}", classpath);
- log.debug("Environment Map:\n{}",
- stringifyMap(amLauncher.getEnv()));
- log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
- }
-
- // rm address
-
- InetSocketAddress rmSchedulerAddress;
- try {
- rmSchedulerAddress = getRmSchedulerAddress(config);
- } catch (IllegalArgumentException e) {
- throw new BadConfigException("%s Address invalid: %s",
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
- }
- String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
-
- JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
- // insert any JVM options);
- sliderAM.addJVMOptions(instanceDefinition, commandLine);
- // enable asserts
- commandLine.enableJavaAssertions();
-
- // if the conf dir has a slideram-log4j.properties, switch to that
- if (hasServerLog4jProperties) {
- commandLine.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
- commandLine.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
- }
-
- // add the AM sevice entry point
- commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
-
- // create action and the cluster name
- commandLine.add(ACTION_CREATE, clustername);
-
- // debug
- if (debugAM) {
- commandLine.add(Arguments.ARG_DEBUG);
- }
-
- // set the cluster directory path
- commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
-
- if (!isUnset(rmAddr)) {
- commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
- }
-
- if (serviceArgs.getFilesystemBinding() != null) {
- commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding());
- }
-
- // pass the registry binding
- commandLine.addConfOptionToCLI(config, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
- RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
- commandLine.addMandatoryConfOption(config, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
-
- if (clusterSecure) {
- // if the cluster is secure, make sure that
- // the relevant security settings go over
- commandLine.addConfOption(config, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
- }
-
- // copy over any/all YARN RM client values, in case the server-side XML conf file
- // has the 0.0.0.0 address
- commandLine.addConfOptions(config,
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.RM_CLUSTER_ID,
- YarnConfiguration.RM_HOSTNAME,
- YarnConfiguration.RM_PRINCIPAL);
-
- // write out the path output
- commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
-
- String cmdStr = commandLine.build();
- log.debug("Completed setting up app master command {}", cmdStr);
-
- amLauncher.addCommandLine(commandLine);
-
- // the Slider AM gets to configure the AM requirements, not the custom provider
- sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
- amLauncher.getResource());
-
-
- // Set the priority for the application master
- amLauncher.setPriority(config.getInt(KEY_YARN_QUEUE_PRIORITY,
- DEFAULT_YARN_QUEUE_PRIORITY));
-
- // Set the queue to which this application is to be submitted in the RM
- // Queue for App master
- String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
- String suppliedQueue = internalOperations.getGlobalOptions().get(INTERNAL_QUEUE);
- if(!isUnset(suppliedQueue)) {
- amQueue = suppliedQueue;
- log.info("Using queue {} for the application instance.", amQueue);
- }
-
- if (isSet(amQueue)) {
- amLauncher.setQueue(amQueue);
- }
- return amLauncher;
- }
-
- /**
- *
- * @param clustername name of the cluster
- * @param clusterDirectory cluster dir
- * @param instanceDefinition the instance definition
- * @param debugAM enable debug AM options
- * @param lifetime
- * @return the launched application
- * @throws YarnException
- * @throws IOException
- */
- public LaunchedApplication launchApplication(String clustername, Path clusterDirectory,
- AggregateConf instanceDefinition, boolean debugAM, long lifetime)
- throws YarnException, IOException {
-
- AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername,
- clusterDirectory,
- instanceDefinition,
- debugAM, lifetime);
-
- applicationId = amLauncher.getApplicationId();
- log.info("Submitting application {}", applicationId);
-
- // submit the application
- LaunchedApplication launchedApplication = amLauncher.submitApplication();
- return launchedApplication;
- }
-
protected Map<String, String> getAmLaunchEnv(Configuration config) {
String sliderAmLaunchEnv = config.get(KEY_AM_LAUNCH_ENV);
log.debug("{} = {}", KEY_AM_LAUNCH_ENV, sliderAmLaunchEnv);
@@ -2431,95 +1642,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return placeholderKeyValueMap;
}
- private void propagatePythonExecutable(Configuration config,
- AggregateConf instanceDefinition) {
- String pythonExec = config.get(
- PYTHON_EXECUTABLE_PATH);
- if (pythonExec != null) {
- instanceDefinition.getAppConfOperations().getGlobalOptions().putIfUnset(
- PYTHON_EXECUTABLE_PATH,
- pythonExec);
- }
- }
-
-
- /**
- * Wait for the launched app to be accepted in the time
- * and, optionally running.
- * <p>
- * If the application
- *
- * @param launchedApplication application
- * @param acceptWaitMillis time in millis to wait for accept
- * @param runWaitMillis time in millis to wait for the app to be running.
- * May be null, in which case no wait takes place
- * @return exit code: success
- * @throws YarnException
- * @throws IOException
- */
- public int waitForAppRunning(LaunchedApplication launchedApplication,
- int acceptWaitMillis, int runWaitMillis) throws YarnException, IOException {
- assert launchedApplication != null;
- int exitCode;
- // wait for the submit state to be reached
- ApplicationReport report = launchedApplication.monitorAppToState(
- YarnApplicationState.ACCEPTED,
- new Duration(acceptWaitMillis));
-
- // may have failed, so check that
- if (hasAppFinished(report)) {
- exitCode = buildExitCode(report);
- } else {
- // exit unless there is a wait
-
-
- if (runWaitMillis != 0) {
- // waiting for state to change
- Duration duration = new Duration(runWaitMillis * 1000);
- duration.start();
- report = launchedApplication.monitorAppToState(
- YarnApplicationState.RUNNING, duration);
- if (report != null &&
- report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
- exitCode = EXIT_SUCCESS;
- } else {
- exitCode = buildExitCode(report);
- }
- } else {
- exitCode = EXIT_SUCCESS;
- }
- }
- return exitCode;
- }
-
-
- /**
- * Propagate any critical principals from the current site config down to the HBase one.
- * @param config config to read from
- * @param clusterSpec cluster spec
- */
- private void propagatePrincipals(Configuration config,
- AggregateConf clusterSpec) {
- String dfsPrincipal = config.get(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
- if (dfsPrincipal != null) {
- String siteDfsPrincipal = SITE_XML_PREFIX + DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
- clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
- siteDfsPrincipal,
- dfsPrincipal);
- }
- }
-
- /**
- * Create a path that must exist in the cluster fs
- * @param uri uri to create
- * @return the path
- * @throws FileNotFoundException if the path does not exist
- */
- public Path createPathThatMustExist(String uri) throws
- SliderException, IOException {
- return sliderFileSystem.createPathThatMustExist(uri);
- }
-
/**
* verify that a live cluster isn't there
* @param clustername cluster name
@@ -2527,7 +1649,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
* @throws SliderException with exit code EXIT_CLUSTER_LIVE
* if a cluster of that name is either live or starting up.
*/
- public void verifyNoLiveClusters(String clustername, String action) throws
+ public void verifyNoLiveApp(String clustername, String action) throws
IOException,
YarnException {
List<ApplicationReport> existing = findAllLiveInstances(clustername);
@@ -2554,11 +1676,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return deployedClusterName;
}
- @VisibleForTesting
- public void setDeployedClusterName(String deployedClusterName) {
- this.deployedClusterName = deployedClusterName;
- }
-
/**
* ask if the client is using a mini MR cluster
* @return true if they are
@@ -2568,109 +1685,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
false);
}
- /**
- * Get the application name used in the zookeeper root paths
- * @return an application-specific path in ZK
- */
- private String getAppName() {
- return "slider";
- }
-
- /**
- * Wait for the app to start running (or go past that state)
- * @param duration time to wait
- * @return the app report; null if the duration turned out
- * @throws YarnException YARN or app issues
- * @throws IOException IO problems
- */
- @VisibleForTesting
- public ApplicationReport monitorAppToRunning(Duration duration)
- throws YarnException, IOException {
- return monitorAppToState(YarnApplicationState.RUNNING, duration);
- }
-
- /**
- * Build an exit code for an application from its report.
- * If the report parameter is null, its interpreted as a timeout
- * @param report report application report
- * @return the exit code
- * @throws IOException
- * @throws YarnException
- */
- private int buildExitCode(ApplicationReport report) throws
- IOException,
- YarnException {
- if (null == report) {
- return EXIT_TIMED_OUT;
- }
-
- YarnApplicationState state = report.getYarnApplicationState();
- FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
- switch (state) {
- case FINISHED:
- if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
- log.info("Application has completed successfully");
- return EXIT_SUCCESS;
- } else {
- log.info("Application finished unsuccessfully." +
- "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop",
- state, dsStatus);
- return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
- }
-
- case KILLED:
- log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
- state, dsStatus);
- return EXIT_YARN_SERVICE_KILLED;
-
- case FAILED:
- log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
- dsStatus);
- return EXIT_YARN_SERVICE_FAILED;
-
- default:
- //not in any of these states
- return EXIT_SUCCESS;
- }
- }
-
- /**
- * Monitor the submitted application for reaching the requested state.
- * Will also report if the app reaches a later state (failed, killed, etc)
- * Kill application if duration!= null & time expires.
- * Prerequisite: the applicatin was launched.
- * @param desiredState desired state.
- * @param duration how long to wait -must be more than 0
- * @return the application report -null on a timeout
- * @throws YarnException
- * @throws IOException
- */
- @VisibleForTesting
- public ApplicationReport monitorAppToState(
- YarnApplicationState desiredState,
- Duration duration)
- throws YarnException, IOException {
- LaunchedApplication launchedApplication =
- new LaunchedApplication(applicationId, yarnClient);
- return launchedApplication.monitorAppToState(desiredState, duration);
- }
-
- @Override
- public ApplicationReport getApplicationReport() throws
- IOException,
- YarnException {
- return getApplicationReport(applicationId);
- }
-
- @Override
- public boolean forceKillApplication(String reason)
- throws YarnException, IOException {
- if (applicationId != null) {
- new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
- return true;
- }
- return false;
- }
/**
* List Slider instances belonging to a specific user with a specific app
@@ -2721,23 +1735,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
/**
- * Retrieve a list of all live instances. If clustername is supplied then it
- * returns this specific cluster, if and only if it exists and is live.
- *
- * @param clustername
- * cluster name (if looking for a specific live cluster)
- * @return the list of application names which satisfies the list criteria
- * @throws IOException
- * @throws YarnException
- */
- public Set<ApplicationReport> getApplicationList(String clustername)
- throws IOException, YarnException {
- ActionListArgs args = new ActionListArgs();
- args.live = true;
- return getApplicationList(clustername, args);
- }
-
- /**
* Retrieve a list of application instances satisfying the query criteria.
*
* @param clustername
@@ -2757,8 +1754,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
// the above call throws an exception so the return is not really required
return Collections.emptySet();
}
- verifyBindingsDefined();
-
boolean live = args.live;
String state = args.state;
boolean listContainers = args.containers;
@@ -2868,29 +1863,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
}
- /**
- * Enumerate slider instances for the current user, and the
- * most recent app report, where available.
- * @param listOnlyInState boolean to indicate that the instances should
- * only include those in a YARN state
- * <code> minAppState <= currentState <= maxAppState </code>
- *
- * @param minAppState minimum application state to include in enumeration.
- * @param maxAppState maximum application state to include
- * @return a map of application instance name to description
- * @throws IOException Any IO problem
- * @throws YarnException YARN problems
- */
- @Override
- public Map<String, SliderInstanceDescription> enumSliderInstances(
- boolean listOnlyInState,
- YarnApplicationState minAppState,
- YarnApplicationState maxAppState)
- throws IOException, YarnException {
- return yarnAppListClient.enumSliderInstances(listOnlyInState,
- minAppState,
- maxAppState);
- }
/**
* Extract the state of a Yarn application --state argument
@@ -2928,22 +1900,16 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
@Override
@VisibleForTesting
- public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
- validateClusterName(name);
- Map<String, String> roleMap = args.getComponentMap();
- // throw usage exception if no changes proposed
- if (roleMap.size() == 0) {
- actionHelp(ACTION_FLEX);
- }
- verifyBindingsDefined();
- log.debug("actionFlex({})", name);
- Map<String, String> roleInstances = new HashMap<>();
- for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
- String key = roleEntry.getKey();
- String val = roleEntry.getValue();
- roleInstances.put(key, val);
- }
- return flex(name, roleInstances);
+ public void actionFlex(String appName, ActionFlexArgs args)
+ throws YarnException, IOException {
+ Component component = new Component();
+ component.setNumberOfContainers(args.getNumberOfContainers());
+ if (StringUtils.isEmpty(args.getComponent())) {
+ component.setName("DEFAULT");
+ } else {
+ component.setName(args.getComponent());
+ }
+ flex(appName, component);
}
@Override
@@ -2954,7 +1920,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
public int actionExists(String name, ActionExistsArgs args) throws YarnException, IOException {
- verifyBindingsDefined();
validateClusterName(name);
boolean checkLive = args.live;
log.debug("actionExists({}, {}, {})", name, checkLive, args.state);
@@ -3050,14 +2015,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
/**
- * Get at the service registry operations
- * @return registry client -valid after the service is inited.
- */
- public YarnAppListClient getYarnAppListClient() {
- return yarnAppListClient;
- }
-
- /**
* Find an instance of an application belonging to the current user.
* @param appname application name
* @return the app report or null if none is found
@@ -3128,20 +2085,20 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return EXIT_SUCCESS;
}
- ClusterDescription status = verifyAndGetClusterDescription(clustername);
+ Application application = getApplication(clustername);
String outfile = statusArgs.getOutput();
if (outfile == null) {
- log.info(status.toJsonString());
+ log.info(application.toString());
} else {
- status.save(new File(outfile).getAbsoluteFile());
+ jsonSerDeser.save(application, new File(statusArgs.getOutput()));
}
return EXIT_SUCCESS;
}
@Override
- public String actionStatus(String clustername)
+ public Application actionStatus(String clustername)
throws YarnException, IOException {
- return verifyAndGetClusterDescription(clustername).toJsonString();
+ return getApplication(clustername);
}
private void queryAndPrintLifetime(String appName)
@@ -3170,13 +2127,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
}
- private ClusterDescription verifyAndGetClusterDescription(String clustername)
- throws YarnException, IOException {
- verifyBindingsDefined();
- validateClusterName(clustername);
- return getClusterDescription(clustername);
- }
-
@Override
public int actionVersion() {
SliderVersionInfo.loadAndPrintVersionInfo(log);
@@ -3184,269 +2134,106 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
}
@Override
- public int actionFreeze(String clustername,
- ActionFreezeArgs freezeArgs) throws YarnException, IOException {
- verifyBindingsDefined();
- validateClusterName(clustername);
- int waittime = freezeArgs.getWaittime();
- String text = freezeArgs.message;
- boolean forcekill = freezeArgs.force;
- log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
- text,
- waittime,
- forcekill);
-
- //is this actually a known cluster?
- sliderFileSystem.locateInstanceDefinition(clustername);
- ApplicationReport app = findInstance(clustername);
+ public void actionStop(String appName, ActionFreezeArgs freezeArgs)
+ throws YarnException, IOException {
+ validateClusterName(appName);
+ ApplicationReport app = findInstance(appName);
if (app == null) {
- // exit early
- log.info("Cluster {} not running", clustername);
- // not an error to stop a stopped cluster
- return EXIT_SUCCESS;
- }
- log.debug("App to stop was found: {}:\n{}", clustername,
- new OnDemandReportStringifier(app));
- if (app.getYarnApplicationState().ordinal() >=
- YarnApplicationState.FINISHED.ordinal()) {
- log.info("Cluster {} is in a terminated state {}", clustername,
- app.getYarnApplicationState());
- return EXIT_SUCCESS;
+ throw new ApplicationNotFoundException(
+ "Application " + appName + " doesn't exist in RM.");
}
- // IPC request for a managed shutdown is only possible if the app is running.
- // so we need to force kill if the app is accepted or submitted
- if (!forcekill
- && app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING.ordinal()) {
- log.info("Cluster {} is in a pre-running state {}. Force killing it", clustername,
+ if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED
+ .ordinal()) {
+ log.info("Application {} is in a terminated state {}", appName,
app.getYarnApplicationState());
- forcekill = true;
- }
-
- LaunchedApplication application = new LaunchedApplication(yarnClient, ap
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org