You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/08/25 20:20:08 UTC
[39/46] incubator-slider git commit: SLIDER-1165 Create
yarn-native-services branch on Slider corresponding to the
yarn-native-services branch on Hadoop
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
deleted file mode 100644
index fd3647d..0000000
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ /dev/null
@@ -1,4572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.slider.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathNotFoundException;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.registry.client.exceptions.NoRecordException;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.RegistryPathStatus;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.KerberosDiags;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterNode;
-import org.apache.slider.api.SliderApplicationApi;
-import org.apache.slider.api.SliderClusterProtocol;
-import org.apache.slider.api.StateValues;
-import org.apache.slider.api.proto.Messages;
-import org.apache.slider.api.types.ContainerInformation;
-import org.apache.slider.api.types.NodeInformationList;
-import org.apache.slider.api.types.SliderInstanceDescription;
-import org.apache.slider.client.ipc.SliderApplicationIpcClient;
-import org.apache.slider.client.ipc.SliderClusterOperations;
-import org.apache.slider.common.Constants;
-import org.apache.slider.common.SliderExitCodes;
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.params.AbstractActionArgs;
-import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
-import org.apache.slider.common.params.ActionAMSuicideArgs;
-import org.apache.slider.common.params.ActionClientArgs;
-import org.apache.slider.common.params.ActionCreateArgs;
-import org.apache.slider.common.params.ActionDependencyArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
-import org.apache.slider.common.params.ActionDiagnosticArgs;
-import org.apache.slider.common.params.ActionEchoArgs;
-import org.apache.slider.common.params.ActionExistsArgs;
-import org.apache.slider.common.params.ActionFlexArgs;
-import org.apache.slider.common.params.ActionFreezeArgs;
-import org.apache.slider.common.params.ActionInstallKeytabArgs;
-import org.apache.slider.common.params.ActionInstallPackageArgs;
-import org.apache.slider.common.params.ActionKDiagArgs;
-import org.apache.slider.common.params.ActionKeytabArgs;
-import org.apache.slider.common.params.ActionKillContainerArgs;
-import org.apache.slider.common.params.ActionListArgs;
-import org.apache.slider.common.params.ActionLookupArgs;
-import org.apache.slider.common.params.ActionNodesArgs;
-import org.apache.slider.common.params.ActionPackageArgs;
-import org.apache.slider.common.params.ActionRegistryArgs;
-import org.apache.slider.common.params.ActionResolveArgs;
-import org.apache.slider.common.params.ActionResourceArgs;
-import org.apache.slider.common.params.ActionStatusArgs;
-import org.apache.slider.common.params.ActionThawArgs;
-import org.apache.slider.common.params.ActionTokensArgs;
-import org.apache.slider.common.params.ActionUpgradeArgs;
-import org.apache.slider.common.params.Arguments;
-import org.apache.slider.common.params.ClientArgs;
-import org.apache.slider.common.params.CommonArgs;
-import org.apache.slider.common.params.LaunchArgsAccessor;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.Duration;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.build.InstanceBuilder;
-import org.apache.slider.core.build.InstanceIO;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
-import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
-import org.apache.slider.core.exceptions.BadClusterStateException;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.exceptions.ErrorStrings;
-import org.apache.slider.core.exceptions.NoSuchNodeException;
-import org.apache.slider.core.exceptions.NotFoundException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
-import org.apache.slider.core.exceptions.UsageException;
-import org.apache.slider.core.exceptions.WaitTimeoutException;
-import org.apache.slider.core.launch.AppMasterLauncher;
-import org.apache.slider.core.launch.ClasspathConstructor;
-import org.apache.slider.core.launch.CredentialUtils;
-import org.apache.slider.core.launch.JavaCommandLineBuilder;
-import org.apache.slider.core.launch.LaunchedApplication;
-import org.apache.slider.core.launch.RunningApplication;
-import org.apache.slider.core.launch.SerializedApplicationReport;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.core.persist.AppDefinitionPersister;
-import org.apache.slider.core.persist.ApplicationReportSerDeser;
-import org.apache.slider.core.persist.ConfPersister;
-import org.apache.slider.core.persist.JsonSerDeser;
-import org.apache.slider.core.persist.LockAcquireFailedException;
-import org.apache.slider.core.registry.SliderRegistryUtils;
-import org.apache.slider.core.registry.YarnAppListClient;
-import org.apache.slider.core.registry.docstore.ConfigFormat;
-import org.apache.slider.core.registry.docstore.PublishedConfigSet;
-import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
-import org.apache.slider.core.registry.docstore.PublishedExports;
-import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
-import org.apache.slider.core.registry.docstore.PublishedExportsSet;
-import org.apache.slider.core.registry.retrieve.RegistryRetriever;
-import org.apache.slider.core.zk.BlockingZKWatcher;
-import org.apache.slider.core.zk.ZKIntegration;
-import org.apache.slider.core.zk.ZKPathBuilder;
-import org.apache.slider.providers.AbstractClientProvider;
-import org.apache.slider.providers.SliderProviderFactory;
-import org.apache.slider.providers.agent.AgentKeys;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
-import org.apache.slider.server.appmaster.SliderAppMaster;
-import org.apache.slider.server.appmaster.rpc.RpcBinder;
-import org.apache.slider.server.services.security.SecurityStore;
-import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.InterruptedIOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
-import static org.apache.slider.api.InternalKeys.*;
-import static org.apache.slider.api.OptionKeys.*;
-import static org.apache.slider.api.ResourceKeys.*;
-import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
-import static org.apache.slider.common.params.SliderActions.*;
-import static org.apache.slider.common.tools.SliderUtils.*;
-
-
-/**
- * Client service for Slider
- */
-
-public class SliderClient extends AbstractSliderLaunchedService implements RunService,
- SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
- private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
- public static final String E_MUST_BE_A_VALID_JSON_FILE
- = "Invalid configuration. Must be a valid json file.";
- public static final String E_INVALID_INSTALL_LOCATION
- = "A valid install location must be provided for the client.";
- public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
- = "Unable to read supplied package file";
- public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION
- = "A valid application package location required.";
- public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory";
- public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist";
- public static final String E_INVALID_APPLICATION_TYPE_NAME
- = "A valid application type name is required (e.g. HBASE).";
- public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite.";
- public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist";
- public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined";
- public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
- public static final String E_PACKAGE_EXISTS = "Package exists";
- private static PrintStream clientOutputStream = System.out;
-
- // value should not be changed without updating string find in slider.py
- private static final String PASSWORD_PROMPT = "Enter password for";
-
- private ClientArgs serviceArgs;
- public ApplicationId applicationId;
-
- private String deployedClusterName;
- /**
- * Cluster operations against the deployed cluster -will be null
- * if no bonding has yet taken place
- */
- private SliderClusterOperations sliderClusterOperations;
-
- protected SliderFileSystem sliderFileSystem;
-
- /**
- * Yarn client service
- */
- private SliderYarnClientImpl yarnClient;
- private YarnAppListClient yarnAppListClient;
- private AggregateConf launchedInstanceDefinition;
-
- /**
- * The YARN registry service
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RegistryOperations registryOperations;
-
- /**
- * Constructor
- */
- public SliderClient() {
- super("Slider Client");
- new HdfsConfiguration();
- new YarnConfiguration();
- }
-
- /**
- * This is called <i>Before serviceInit is called</i>
- * @param config the initial configuration build up by the
- * service launcher.
- * @param args argument list list of arguments passed to the command line
- * after any launcher-specific commands have been stripped.
- * @return the post-binding configuration to pass to the <code>init()</code>
- * operation.
- * @throws Exception
- */
- @Override
- public Configuration bindArgs(Configuration config, String... args) throws Exception {
- config = super.bindArgs(config, args);
- serviceArgs = new ClientArgs(args);
- serviceArgs.parse();
- // add the slider XML config
- ConfigHelper.injectSliderXMLResource();
- // yarn-ify
- YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
- return patchConfiguration(yarnConfiguration);
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- Configuration clientConf = loadSliderClientXML();
- ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true);
- serviceArgs.applyDefinitions(conf);
- serviceArgs.applyFileSystemBinding(conf);
- AbstractActionArgs coreAction = serviceArgs.getCoreAction();
- // init security with our conf
- if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) {
- forceLogin();
- initProcessSecurity(conf);
- }
- if (coreAction.getHadoopServicesRequired()) {
- initHadoopBinding();
- }
- super.serviceInit(conf);
- }
-
- /**
- * Launched service execution. This runs {@link #exec()}
- * then catches some exceptions and converts them to exit codes
- * @return an exit code
- * @throws Throwable
- */
- @Override
- public int runService() throws Throwable {
- try {
- return exec();
- } catch (FileNotFoundException | PathNotFoundException nfe) {
- throw new NotFoundException(nfe, nfe.toString());
- }
- }
-
- /**
- * Execute the command line
- * @return an exit code
- * @throws Throwable on a failure
- */
- public int exec() throws Throwable {
-
- // choose the action
- String action = serviceArgs.getAction();
- if (isUnset(action)) {
- throw new SliderException(EXIT_USAGE, serviceArgs.usage());
- }
-
- int exitCode = EXIT_SUCCESS;
- String clusterName = serviceArgs.getClusterName();
- // actions
-
- switch (action) {
- case ACTION_AM_SUICIDE:
- exitCode = actionAmSuicide(clusterName,
- serviceArgs.getActionAMSuicideArgs());
- break;
-
- case ACTION_BUILD:
- exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
- break;
-
- case ACTION_CLIENT:
- exitCode = actionClient(serviceArgs.getActionClientArgs());
- break;
-
- case ACTION_CREATE:
- exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
- break;
-
- case ACTION_DEPENDENCY:
- exitCode = actionDependency(serviceArgs.getActionDependencyArgs());
- break;
-
- case ACTION_DESTROY:
- exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs());
- break;
-
- case ACTION_DIAGNOSTICS:
- exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
- break;
-
- case ACTION_EXISTS:
- exitCode = actionExists(clusterName,
- serviceArgs.getActionExistsArgs());
- break;
-
- case ACTION_FLEX:
- exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
- break;
-
- case ACTION_FREEZE:
- exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
- break;
-
- case ACTION_HELP:
- log.info(serviceArgs.usage());
- break;
-
- case ACTION_KDIAG:
- exitCode = actionKDiag(serviceArgs.getActionKDiagArgs());
- break;
-
- case ACTION_KILL_CONTAINER:
- exitCode = actionKillContainer(clusterName,
- serviceArgs.getActionKillContainerArgs());
- break;
-
- case ACTION_INSTALL_KEYTAB:
- exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
- break;
-
- case ACTION_INSTALL_PACKAGE:
- exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
- break;
-
- case ACTION_KEYTAB:
- exitCode = actionKeytab(serviceArgs.getActionKeytabArgs());
- break;
-
- case ACTION_LIST:
- exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
- break;
-
- case ACTION_LOOKUP:
- exitCode = actionLookup(serviceArgs.getActionLookupArgs());
- break;
-
- case ACTION_NODES:
- exitCode = actionNodes("", serviceArgs.getActionNodesArgs());
- break;
-
- case ACTION_PACKAGE:
- exitCode = actionPackage(serviceArgs.getActionPackageArgs());
- break;
-
- case ACTION_REGISTRY:
- exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
- break;
-
- case ACTION_RESOLVE:
- exitCode = actionResolve(serviceArgs.getActionResolveArgs());
- break;
-
- case ACTION_RESOURCE:
- exitCode = actionResource(serviceArgs.getActionResourceArgs());
- break;
-
- case ACTION_STATUS:
- exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
- break;
-
- case ACTION_THAW:
- exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
- break;
-
- case ACTION_TOKENS:
- exitCode = actionTokens(serviceArgs.getActionTokenArgs());
- break;
-
- case ACTION_UPDATE:
- exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
- break;
-
- case ACTION_UPGRADE:
- exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs());
- break;
-
- case ACTION_VERSION:
- exitCode = actionVersion();
- break;
-
- default:
- throw new SliderException(EXIT_UNIMPLEMENTED,
- "Unimplemented: " + action);
- }
-
- return exitCode;
- }
-
- /**
- * Perform everything needed to init the hadoop binding.
- * This assumes that the service is already in inited or started state
- * @throws IOException
- * @throws SliderException
- */
- protected void initHadoopBinding() throws IOException, SliderException {
- // validate the client
- validateSliderClientEnvironment(null);
- //create the YARN client
- yarnClient = new SliderYarnClientImpl();
- yarnClient.init(getConfig());
- if (getServiceState() == STATE.STARTED) {
- yarnClient.start();
- }
- addService(yarnClient);
- yarnAppListClient =
- new YarnAppListClient(yarnClient, getUsername(), getConfig());
- // create the filesystem
- sliderFileSystem = new SliderFileSystem(getConfig());
- }
-
- /**
- * Delete the zookeeper node associated with the calling user and the cluster
- * TODO: YARN registry operations
- **/
- @VisibleForTesting
- public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
- String user = getUsername();
- String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
- Exception e = null;
- try {
- Configuration config = getConfig();
- ZKIntegration client = getZkClient(clusterName, user);
- if (client != null) {
- if (client.exists(zkPath)) {
- log.info("Deleting zookeeper path {}", zkPath);
- }
- client.deleteRecursive(zkPath);
- return true;
- }
- } catch (InterruptedException | BadConfigException | KeeperException ex) {
- e = ex;
- }
- if (e != null) {
- log.warn("Unable to recursively delete zk node {}", zkPath, e);
- }
-
- return false;
- }
-
- /**
- * Create the zookeeper node associated with the calling user and the cluster
- *
- * @param clusterName slider application name
- * @param nameOnly should the name only be created (i.e. don't create ZK node)
- * @return the path, using the policy implemented in
- * {@link ZKIntegration#mkClusterPath(String, String)}
- * @throws YarnException
- * @throws IOException
- */
- @VisibleForTesting
- public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
- try {
- return createZookeeperNodeInner(clusterName, nameOnly);
- } catch (KeeperException.NodeExistsException e) {
- return null;
- } catch (KeeperException e) {
- return null;
- } catch (InterruptedException e) {
- throw new InterruptedIOException(e.toString());
- }
- }
-
- /**
- * Create the zookeeper node associated with the calling user and the cluster
- * -throwing exceptions on any failure
- * @param clusterName cluster name
- * @param nameOnly create the path, not the node
- * @return the path, using the policy implemented in
- * {@link ZKIntegration#mkClusterPath(String, String)}
- * @throws YarnException
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @VisibleForTesting
- public String createZookeeperNodeInner(String clusterName, Boolean nameOnly)
- throws YarnException, IOException, KeeperException, InterruptedException {
- String user = getUsername();
- String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
- if (nameOnly) {
- return zkPath;
- }
- ZKIntegration client = getZkClient(clusterName, user);
- if (client != null) {
- // set up the permissions. This must be done differently on a secure cluster from an insecure
- // one
- List<ACL> zkperms = new ArrayList<>();
- if (UserGroupInformation.isSecurityEnabled()) {
- zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
- zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
- } else {
- zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
- }
- client.createPath(zkPath, "",
- zkperms,
- CreateMode.PERSISTENT);
- return zkPath;
- } else {
- return null;
- }
- }
-
- /**
- * Gets a zookeeper client, returns null if it cannot connect to zookeeper
- **/
- protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
- String registryQuorum = lookupZKQuorum();
- ZKIntegration client = null;
- try {
- BlockingZKWatcher watcher = new BlockingZKWatcher();
- client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher,
- ZKIntegration.SESSION_TIMEOUT);
- client.init();
- watcher.waitForZKConnection(2 * 1000);
- } catch (InterruptedException e) {
- client = null;
- log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
- } catch (IOException e) {
- log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
- }
- return client;
- }
-
- /**
- * Keep this signature for backward compatibility with
- * force=true by default.
- */
- @Override
- public int actionDestroy(String clustername) throws YarnException,
- IOException {
- ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
- destroyArgs.force = true;
- return actionDestroy(clustername, destroyArgs);
- }
-
- @Override
- public int actionDestroy(String clustername,
- ActionDestroyArgs destroyArgs) throws YarnException, IOException {
- // verify that a live cluster isn't there
- validateClusterName(clustername);
- //no=op, it is now mandatory.
- verifyBindingsDefined();
- verifyNoLiveClusters(clustername, "Destroy");
- boolean forceDestroy = destroyArgs.force;
- log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
-
- // create the directory path
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- // delete the directory;
- FileSystem fs = sliderFileSystem.getFileSystem();
- boolean exists = fs.exists(clusterDirectory);
- if (exists) {
- log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
- if (!forceDestroy) {
- // fail the command if --force is not explicitly specified
- throw new UsageException("Destroy will permanently delete directories and registries. "
- + "Reissue this command with the --force option if you want to proceed.");
- }
- if (!fs.delete(clusterDirectory, true)) {
- log.warn("Filesystem returned false from delete() operation");
- }
-
- if(!deleteZookeeperNode(clustername)) {
- log.warn("Unable to perform node cleanup in Zookeeper.");
- }
-
- if (fs.exists(clusterDirectory)) {
- log.warn("Failed to delete {}", clusterDirectory);
- }
-
- } else {
- log.debug("Application Instance {} already destroyed", clustername);
- }
-
- // rm the registry entry \u2014do not let this block the destroy operations
- String registryPath = SliderRegistryUtils.registryPathForInstance(
- clustername);
- try {
- getRegistryOperations().delete(registryPath, true);
- } catch (IOException e) {
- log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
- } catch (SliderException e) {
- log.warn("Error binding to registry {} ", e, e);
- }
-
- List<ApplicationReport> instances = findAllLiveInstances(clustername);
- // detect any race leading to cluster creation during the check/destroy process
- // and report a problem.
- if (!instances.isEmpty()) {
- throw new SliderException(EXIT_APPLICATION_IN_USE,
- clustername + ": "
- + E_DESTROY_CREATE_RACE_CONDITION
- + " :" +
- instances.get(0));
- }
- log.info("Destroyed cluster {}", clustername);
- return EXIT_SUCCESS;
- }
-
- @Override
- public int actionAmSuicide(String clustername,
- ActionAMSuicideArgs args) throws YarnException, IOException {
- SliderClusterOperations clusterOperations =
- createClusterOperations(clustername);
- clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
- return EXIT_SUCCESS;
- }
-
- @Override
- public AbstractClientProvider createClientProvider(String provider)
- throws SliderException {
- SliderProviderFactory factory =
- SliderProviderFactory.createSliderProviderFactory(provider);
- return factory.createClientProvider();
- }
-
- /**
- * Create the cluster -saving the arguments to a specification file first
- * @param clustername cluster name
- * @return the status code
- * @throws YarnException Yarn problems
- * @throws IOException other problems
- * @throws BadCommandArgumentsException bad arguments.
- */
- public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
- YarnException,
- IOException {
-
- actionBuild(clustername, createArgs);
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- clustername, clusterDirectory);
- try {
- checkForCredentials(getConfig(), instanceDefinition.getAppConf(),
- clustername);
- } catch (IOException e) {
- sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
- throw e;
- }
- return startCluster(clustername, createArgs);
- }
-
- @Override
- public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
- throws YarnException, IOException {
- File template = upgradeArgs.template;
- File resources = upgradeArgs.resources;
- List<String> containers = upgradeArgs.containers;
- List<String> components = upgradeArgs.components;
-
- // For upgrade spec, let's be little more strict with validation. If either
- // --template or --resources is specified, then both needs to be specified.
- // Otherwise the internal app config and resources states of the app will be
- // unwantedly modified and the change will take effect to the running app
- // immediately.
- require(!(template != null && resources == null),
- "Option %s must be specified with option %s",
- Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
-
- require(!(resources != null && template == null),
- "Option %s must be specified with option %s",
- Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
-
- // For upgrade spec, both --template and --resources should be specified
- // and neither of --containers or --components should be used
- if (template != null && resources != null) {
- require(CollectionUtils.isEmpty(containers),
- "Option %s cannot be specified with %s or %s",
- Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
- Arguments.ARG_RESOURCES);
- require(CollectionUtils.isEmpty(components),
- "Option %s cannot be specified with %s or %s",
- Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
- Arguments.ARG_RESOURCES);
-
- // not an error to try to upgrade a stopped cluster, just return success
- // code, appropriate log messages have already been dumped
- if (!isAppInRunningState(clustername)) {
- return EXIT_SUCCESS;
- }
-
- // Now initiate the upgrade spec flow
- buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
- SliderClusterOperations clusterOperations = createClusterOperations(clustername);
- clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
- return EXIT_SUCCESS;
- }
-
- // Since neither --template or --resources were specified, it is upgrade
- // containers flow. Here any one or both of --containers and --components
- // can be specified. If a container is specified with --containers option
- // and also belongs to a component type specified with --components, it will
- // be upgraded only once.
- return actionUpgradeContainers(clustername, upgradeArgs);
- }
-
- private int actionUpgradeContainers(String clustername,
- ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
- verifyBindingsDefined();
- validateClusterName(clustername);
- int waittime = upgradeArgs.getWaittime(); // ignored for now
- String text = "Upgrade containers";
- log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
- text, waittime);
-
- // not an error to try to upgrade a stopped cluster, just return success
- // code, appropriate log messages have already been dumped
- if (!isAppInRunningState(clustername)) {
- return EXIT_SUCCESS;
- }
-
- // Create sets of containers and components to get rid of duplicates and
- // for quick lookup during checks below
- Set<String> containers = new HashSet<>();
- if (upgradeArgs.containers != null) {
- containers.addAll(new ArrayList<>(upgradeArgs.containers));
- }
- Set<String> components = new HashSet<>();
- if (upgradeArgs.components != null) {
- components.addAll(new ArrayList<>(upgradeArgs.components));
- }
-
- // check validity of component names and running containers here
- List<ContainerInformation> liveContainers = getContainers(clustername);
- Set<String> validContainers = new HashSet<>();
- Set<String> validComponents = new HashSet<>();
- for (ContainerInformation liveContainer : liveContainers) {
- boolean allContainersAndComponentsAccountedFor = true;
- if (CollectionUtils.isNotEmpty(containers)) {
- if (containers.contains(liveContainer.containerId)) {
- containers.remove(liveContainer.containerId);
- validContainers.add(liveContainer.containerId);
- }
- allContainersAndComponentsAccountedFor = false;
- }
- if (CollectionUtils.isNotEmpty(components)) {
- if (components.contains(liveContainer.component)) {
- components.remove(liveContainer.component);
- validComponents.add(liveContainer.component);
- }
- allContainersAndComponentsAccountedFor = false;
- }
- if (allContainersAndComponentsAccountedFor) {
- break;
- }
- }
-
- // If any item remains in containers or components then they are invalid.
- // Log warning for them and proceed.
- if (CollectionUtils.isNotEmpty(containers)) {
- log.warn("Invalid set of containers provided {}", containers);
- }
- if (CollectionUtils.isNotEmpty(components)) {
- log.warn("Invalid set of components provided {}", components);
- }
-
- // If not a single valid container or component is specified do not proceed
- if (CollectionUtils.isEmpty(validContainers)
- && CollectionUtils.isEmpty(validComponents)) {
- log.error("Not a single valid container or component specified. Nothing to do.");
- return EXIT_NOT_FOUND;
- }
-
- SliderClusterProtocol appMaster = connect(findInstance(clustername));
- Messages.UpgradeContainersRequestProto r =
- Messages.UpgradeContainersRequestProto
- .newBuilder()
- .setMessage(text)
- .addAllContainer(validContainers)
- .addAllComponent(validComponents)
- .build();
- appMaster.upgradeContainers(r);
- log.info("Cluster upgrade issued for -");
- if (CollectionUtils.isNotEmpty(validContainers)) {
- log.info(" Containers (total {}): {}", validContainers.size(),
- validContainers);
- }
- if (CollectionUtils.isNotEmpty(validComponents)) {
- log.info(" Components (total {}): {}", validComponents.size(),
- validComponents);
- }
-
- return EXIT_SUCCESS;
- }
-
- // returns true if and only if app is in RUNNING state
- private boolean isAppInRunningState(String clustername) throws YarnException,
- IOException {
- // is this actually a known cluster?
- sliderFileSystem.locateInstanceDefinition(clustername);
- ApplicationReport app = findInstance(clustername);
- if (app == null) {
- // exit early
- log.info("Cluster {} not running", clustername);
- return false;
- }
- log.debug("App to upgrade was found: {}:\n{}", clustername,
- new OnDemandReportStringifier(app));
- if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
- log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.",
- clustername, app.getYarnApplicationState(), ACTION_UPDATE);
- return false;
- }
-
- // IPC request to upgrade containers is possible if the app is running.
- if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
- .ordinal()) {
- log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
- + "to be RUNNING.", clustername, app.getYarnApplicationState());
- return false;
- }
-
- return true;
- }
-
- protected static void checkForCredentials(Configuration conf,
- ConfTree tree, String clusterName) throws IOException {
- if (tree.credentials == null || tree.credentials.isEmpty()) {
- log.info("No credentials requested");
- return;
- }
-
- BufferedReader br = null;
- try {
- for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
- String provider = cred.getKey()
- .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName)
- .replaceAll(Pattern.quote("${CLUSTER}"), clusterName);
- List<String> aliases = cred.getValue();
- if (aliases == null || aliases.isEmpty()) {
- continue;
- }
- Configuration c = new Configuration(conf);
- c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
- CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0);
- Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases());
- for (String alias : aliases) {
- if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
- log.info("Credentials for " + alias + " found in " + provider);
- } else {
- if (br == null) {
- br = new BufferedReader(new InputStreamReader(System.in));
- }
- char[] pass = readPassword(alias, br);
- credentialProvider.createCredentialEntry(alias, pass);
- credentialProvider.flush();
- Arrays.fill(pass, ' ');
- }
- }
- }
- } finally {
- org.apache.hadoop.io.IOUtils.closeStream(br);
- }
- }
-
- private static char[] readOnePassword(String alias) throws IOException {
- try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
- return readPassword(alias, br);
- }
- }
-
- // using a normal reader instead of a secure one,
- // because stdin is not hooked up to the command line
- private static char[] readPassword(String alias, BufferedReader br)
- throws IOException {
- char[] cred = null;
-
- boolean noMatch;
- do {
- log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
- char[] newPassword1 = br.readLine().toCharArray();
- log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
- char[] newPassword2 = br.readLine().toCharArray();
- noMatch = !Arrays.equals(newPassword1, newPassword2);
- if (noMatch) {
- if (newPassword1 != null) Arrays.fill(newPassword1, ' ');
- log.info(String.format("Passwords don't match. Try again."));
- } else {
- cred = newPassword1;
- }
- if (newPassword2 != null) Arrays.fill(newPassword2, ' ');
- } while (noMatch);
- if (cred == null)
- throw new IOException("Could not read credentials for " + alias +
- " from stdin");
- return cred;
- }
-
- @Override
- public int actionBuild(String clustername,
- AbstractClusterBuildingActionArgs buildInfo) throws
- YarnException,
- IOException {
-
- buildInstanceDefinition(clustername, buildInfo, false, false);
- return EXIT_SUCCESS;
- }
-
- @Override
- public int actionKeytab(ActionKeytabArgs keytabInfo)
- throws YarnException, IOException {
- if (keytabInfo.install) {
- return actionInstallKeytab(keytabInfo);
- } else if (keytabInfo.delete) {
- return actionDeleteKeytab(keytabInfo);
- } else if (keytabInfo.list) {
- return actionListKeytab(keytabInfo);
- } else {
- throw new BadCommandArgumentsException(
- "Keytab option specified not found.\n"
- + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
- }
- }
-
- private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException {
- String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY;
- Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder);
- RemoteIterator<LocatedFileStatus> files =
- sliderFileSystem.getFileSystem().listFiles(keytabPath, true);
- log.info("Keytabs:");
- while (files.hasNext()) {
- log.info("\t" + files.next().getPath().toString());
- }
-
- return EXIT_SUCCESS;
- }
-
- private int actionDeleteKeytab(ActionKeytabArgs keytabInfo)
- throws BadCommandArgumentsException, IOException {
- if (StringUtils.isEmpty(keytabInfo.folder)) {
- throw new BadCommandArgumentsException(
- "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
- + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
- }
-
- if (StringUtils.isEmpty(keytabInfo.keytab)) {
- throw new BadCommandArgumentsException("A keytab name is required.");
- }
-
- Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
-
- Path fileInFs = new Path(pkgPath, keytabInfo.keytab );
- log.info("Deleting keytab {}", fileInFs);
- FileSystem sfs = sliderFileSystem.getFileSystem();
- require(sfs.exists(fileInFs), "No keytab to delete found at %s",
- fileInFs.toUri());
- sfs.delete(fileInFs, false);
-
- return EXIT_SUCCESS;
- }
-
- private int actionInstallKeytab(ActionKeytabArgs keytabInfo)
- throws BadCommandArgumentsException, IOException {
- Path srcFile = null;
- require(isSet(keytabInfo.folder),
- "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
- + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
-
- requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab);
- File keytabFile = new File(keytabInfo.keytab);
- require(keytabFile.isFile(),
- "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath());
- srcFile = new Path(keytabFile.toURI());
-
- Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
- FileSystem sfs = sliderFileSystem.getFileSystem();
- sfs.mkdirs(pkgPath);
- sfs.setPermission(pkgPath, new FsPermission(
- FsAction.ALL, FsAction.NONE, FsAction.NONE));
-
- Path fileInFs = new Path(pkgPath, srcFile.getName());
- log.info("Installing keytab {} at {} and overwrite is {}.",
- srcFile, fileInFs, keytabInfo.overwrite);
- require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite),
- "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
-
- sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs);
- sfs.setPermission(fileInFs,
- new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
-
- return EXIT_SUCCESS;
- }
-
- @Override
- public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
- throws YarnException, IOException {
- log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'.");
- return actionKeytab(new ActionKeytabArgs(installKeytabInfo));
- }
-
- @Override
- public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
- YarnException,
- IOException {
- log.warn("The " + ACTION_INSTALL_PACKAGE
- + " option has been deprecated. Please use '"
- + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'.");
- if (StringUtils.isEmpty(installPkgInfo.name)) {
- throw new BadCommandArgumentsException(
- E_INVALID_APPLICATION_TYPE_NAME + "\n"
- + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
- }
- Path srcFile = extractPackagePath(installPkgInfo.packageURI);
-
- // Do not provide new options to install-package command as it is in
- // deprecated mode. So version is kept null here. Use package --install.
- Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name,
- null);
- FileSystem sfs = sliderFileSystem.getFileSystem();
- sfs.mkdirs(pkgPath);
-
- Path fileInFs = new Path(pkgPath, srcFile.getName());
- log.info("Installing package {} at {} and overwrite is {}.",
- srcFile, fileInFs, installPkgInfo.replacePkg);
- require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg),
- "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE);
- sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
- return EXIT_SUCCESS;
- }
-
- @Override
- public int actionResource(ActionResourceArgs resourceInfo)
- throws YarnException, IOException {
- if (resourceInfo.help) {
- actionHelp(ACTION_RESOURCE);
- return EXIT_SUCCESS;
- } else if (resourceInfo.install) {
- return actionInstallResource(resourceInfo);
- } else if (resourceInfo.delete) {
- return actionDeleteResource(resourceInfo);
- } else if (resourceInfo.list) {
- return actionListResource(resourceInfo);
- } else {
- throw new BadCommandArgumentsException(
- "Resource option specified not found.\n"
- + CommonArgs.usage(serviceArgs, ACTION_RESOURCE));
- }
- }
-
- private int actionListResource(ActionResourceArgs resourceInfo) throws IOException {
- String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
- Path path = sliderFileSystem.buildResourcePath(folder);
- RemoteIterator<LocatedFileStatus> files =
- sliderFileSystem.getFileSystem().listFiles(path, true);
- log.info("Resources:");
- while (files.hasNext()) {
- log.info("\t" + files.next().getPath().toString());
- }
-
- return EXIT_SUCCESS;
- }
-
- private int actionDeleteResource(ActionResourceArgs resourceInfo)
- throws BadCommandArgumentsException, IOException {
- if (StringUtils.isEmpty(resourceInfo.resource)) {
- throw new BadCommandArgumentsException("A file name is required.");
- }
-
- Path fileInFs;
- if (resourceInfo.folder == null) {
- fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource);
- } else {
- fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder,
- resourceInfo.resource);
- }
-
- log.info("Deleting resource {}", fileInFs);
- FileSystem sfs = sliderFileSystem.getFileSystem();
- require(sfs.exists(fileInFs), "No resource to delete found at %s", fileInFs.toUri());
- sfs.delete(fileInFs, true);
-
- return EXIT_SUCCESS;
- }
-
- private int actionInstallResource(ActionResourceArgs resourceInfo)
- throws BadCommandArgumentsException, IOException {
- Path srcFile = null;
- String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
-
- requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource);
- File file = new File(resourceInfo.resource);
- require(file.isFile() || file.isDirectory(),
- "Unable to access supplied file at %s", file.getAbsolutePath());
-
- File[] files;
- if (file.isDirectory()) {
- files = file.listFiles();
- } else {
- files = new File[] { file };
- }
-
- Path pkgPath = sliderFileSystem.buildResourcePath(folder);
- FileSystem sfs = sliderFileSystem.getFileSystem();
-
- if (!sfs.exists(pkgPath)) {
- sfs.mkdirs(pkgPath);
- sfs.setPermission(pkgPath, new FsPermission(
- FsAction.ALL, FsAction.NONE, FsAction.NONE));
- } else {
- require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " +
- "not a directory", folder);
- }
-
- for (File f : files) {
- srcFile = new Path(f.toURI());
-
- Path fileInFs = new Path(pkgPath, srcFile.getName());
- log.info("Installing file {} at {} and overwrite is {}.",
- srcFile, fileInFs, resourceInfo.overwrite);
- require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
- "File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
-
- sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
- sfs.setPermission(fileInFs,
- new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
- }
-
- return EXIT_SUCCESS;
- }
-
- @Override
- public int actionClient(ActionClientArgs clientInfo) throws
- YarnException,
- IOException {
- if (clientInfo.install) {
- return doClientInstall(clientInfo);
- } else if (clientInfo.getCertStore) {
- return doCertificateStoreRetrieval(clientInfo);
- } else {
- throw new BadCommandArgumentsException(
- "Only install, keystore, and truststore commands are supported for the client.\n"
- + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
-
- }
- }
-
- private int doCertificateStoreRetrieval(ActionClientArgs clientInfo)
- throws YarnException, IOException {
- if (clientInfo.keystore != null && clientInfo.truststore != null) {
- throw new BadCommandArgumentsException(
- "Only one of either keystore or truststore can be retrieved at one time. "
- + "Retrieval of both should be done separately\n"
- + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
- }
-
- requireArgumentSet(Arguments.ARG_NAME, clientInfo.name);
-
- File storeFile = null;
- SecurityStore.StoreType type;
- if (clientInfo.keystore != null) {
- storeFile = clientInfo.keystore;
- type = SecurityStore.StoreType.keystore;
- } else {
- storeFile = clientInfo.truststore;
- type = SecurityStore.StoreType.truststore;
- }
-
- require (!storeFile.exists(),
- "File %s already exists. Please remove that file or select a different file name.",
- storeFile.getAbsolutePath());
- String hostname = null;
- if (type == SecurityStore.StoreType.keystore) {
- hostname = clientInfo.hostname;
- if (hostname == null) {
- hostname = InetAddress.getLocalHost().getCanonicalHostName();
- log.info("No hostname specified via command line. Using {}", hostname);
- }
- }
-
- String password = clientInfo.password;
- if (password == null) {
- String provider = clientInfo.provider;
- String alias = clientInfo.alias;
- if (provider != null && alias != null) {
- Configuration conf = new Configuration(getConfig());
- conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
- char[] chars = conf.getPassword(alias);
- if (chars == null) {
- CredentialProvider credentialProvider =
- CredentialProviderFactory.getProviders(conf).get(0);
- chars = readOnePassword(alias);
- credentialProvider.createCredentialEntry(alias, chars);
- credentialProvider.flush();
- }
- password = String.valueOf(chars);
- Arrays.fill(chars, ' ');
- } else {
- log.info("No password and no provider/alias pair were provided, " +
- "prompting for password");
- // get a password
- password = String.valueOf(readOnePassword(type.name()));
- }
- }
-
- byte[] keystore = createClusterOperations(clientInfo.name)
- .getClientCertificateStore(hostname, "client", password, type.name());
- // persist to file
- FileOutputStream storeFileOutputStream = null;
- try {
- storeFileOutputStream = new FileOutputStream(storeFile);
- IOUtils.write(keystore, storeFileOutputStream);
- } catch (Exception e) {
- log.error("Unable to persist to file {}", storeFile);
- throw e;
- } finally {
- if (storeFileOutputStream != null) {
- storeFileOutputStream.close();
- }
- }
-
- return EXIT_SUCCESS;
- }
-
- private int doClientInstall(ActionClientArgs clientInfo)
- throws IOException, SliderException {
-
- require(clientInfo.installLocation != null,
- E_INVALID_INSTALL_LOCATION +"\n"
- + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
- require(clientInfo.installLocation.exists(),
- E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath());
-
- require(clientInfo.installLocation.isDirectory(),
- E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath());
-
- File pkgFile;
- File tmpDir = null;
-
- require(isSet(clientInfo.packageURI) || isSet(clientInfo.name),
- E_INVALID_APPLICATION_PACKAGE_LOCATION);
- if (isSet(clientInfo.packageURI)) {
- pkgFile = new File(clientInfo.packageURI);
- } else {
- Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name);
- Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG);
- require(sliderFileSystem.isFile(appDefPath),
- E_INVALID_APPLICATION_PACKAGE_LOCATION);
- tmpDir = Files.createTempDir();
- pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG);
- sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile);
- }
- require(pkgFile.isFile(),
- E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath());
-
- JSONObject config = null;
- if(clientInfo.clientConfig != null) {
- try {
- byte[] encoded = Files.toByteArray(clientInfo.clientConfig);
- config = new JSONObject(new String(encoded, Charset.defaultCharset()));
- } catch (JSONException jsonEx) {
- log.error("Unable to read supplied configuration at {}: {}",
- clientInfo.clientConfig, jsonEx);
- log.debug("Unable to read supplied configuration at {}: {}",
- clientInfo.clientConfig, jsonEx, jsonEx);
- throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx);
- }
- }
-
- // Only INSTALL is supported
- AbstractClientProvider
- provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE);
- provider.processClientOperation(sliderFileSystem,
- getRegistryOperations(),
- getConfig(),
- "INSTALL",
- clientInfo.installLocation,
- pkgFile,
- config,
- clientInfo.name);
- return EXIT_SUCCESS;
- }
-
-
- @Override
- public int actionPackage(ActionPackageArgs actionPackageInfo)
- throws YarnException, IOException {
- initializeOutputStream(actionPackageInfo.out);
- int exitCode = -1;
- if (actionPackageInfo.help) {
- exitCode = actionHelp(ACTION_PACKAGE);
- }
- if (actionPackageInfo.install) {
- exitCode = actionPackageInstall(actionPackageInfo);
- }
- if (actionPackageInfo.delete) {
- exitCode = actionPackageDelete(actionPackageInfo);
- }
- if (actionPackageInfo.list) {
- exitCode = actionPackageList();
- }
- if (actionPackageInfo.instances) {
- exitCode = actionPackageInstances();
- }
- finalizeOutputStream(actionPackageInfo.out);
- if (exitCode != -1) {
- return exitCode;
- }
- throw new BadCommandArgumentsException(
- "Select valid package operation option");
- }
-
- private void initializeOutputStream(String outFile)
- throws FileNotFoundException {
- if (outFile != null) {
- clientOutputStream = new PrintStream(new FileOutputStream(outFile));
- } else {
- clientOutputStream = System.out;
- }
- }
-
- private void finalizeOutputStream(String outFile) {
- if (outFile != null && clientOutputStream != null) {
- clientOutputStream.flush();
- clientOutputStream.close();
- }
- clientOutputStream = System.out;
- }
-
- private int actionPackageInstances() throws YarnException, IOException {
- Map<String, Path> persistentInstances = sliderFileSystem
- .listPersistentInstances();
- if (persistentInstances.isEmpty()) {
- log.info("No slider cluster specification available");
- return EXIT_SUCCESS;
- }
- String pkgPathValue = sliderFileSystem
- .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri()
- .getPath();
- FileSystem fs = sliderFileSystem.getFileSystem();
- Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances
- .entrySet().iterator();
- log.info("List of applications with its package name and path");
- println("%-25s %15s %30s %s", "Cluster Name", "Package Name",
- "Package Version", "Application Location");
- while(instanceItr.hasNext()) {
- Map.Entry<String, Path> entry = instanceItr.next();
- String clusterName = entry.getKey();
- Path clusterPath = entry.getValue();
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- clusterName, clusterPath);
- Path appDefPath = null;
- try {
- appDefPath = new Path(
- getApplicationDefinitionPath(instanceDefinition
- .getAppConfOperations()));
- } catch (BadConfigException e) {
- // Invalid cluster state, so move on to next. No need to log anything
- // as this is just listing of instances.
- continue;
- }
- if (!appDefPath.isUriPathAbsolute()) {
- appDefPath = new Path(fs.getHomeDirectory(), appDefPath);
- }
- String appDefPathStr = appDefPath.toUri().toString();
- try {
- if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) {
- String packageName = appDefPath.getParent().getName();
- String packageVersion = StringUtils.EMPTY;
- if (instanceDefinition.isVersioned()) {
- packageVersion = packageName;
- packageName = appDefPath.getParent().getParent().getName();
- }
- println("%-25s %15s %30s %s", clusterName, packageName,
- packageVersion, appDefPathStr);
- }
- } catch (IOException e) {
- log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr);
- }
- }
- return EXIT_SUCCESS;
- }
-
- private int actionPackageList() throws IOException {
- Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY,
- StringUtils.EMPTY);
- log.info("Package install path : {}", pkgPath);
- FileSystem sfs = sliderFileSystem.getFileSystem();
- if (!sfs.isDirectory(pkgPath)) {
- log.info("No package(s) installed");
- return EXIT_SUCCESS;
- }
- FileStatus[] fileStatus = sfs.listStatus(pkgPath);
- boolean hasPackage = false;
- StringBuilder sb = new StringBuilder();
- sb.append("List of installed packages:\n");
- for (FileStatus fstat : fileStatus) {
- if (fstat.isDirectory()) {
- sb.append("\t").append(fstat.getPath().getName());
- sb.append("\n");
- hasPackage = true;
- }
- }
- if (hasPackage) {
- println(sb.toString());
- } else {
- log.info("No package(s) installed");
- }
- return EXIT_SUCCESS;
- }
-
- private void createSummaryMetainfoFile(Path srcFile, Path destFile,
- boolean overwrite) throws IOException {
- FileSystem srcFs = srcFile.getFileSystem(getConfig());
- try (InputStream inputStreamJson = SliderUtils
- .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json");
- InputStream inputStreamXml = SliderUtils
- .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) {
- InputStream inputStream = null;
- Path summaryFileInFs = null;
- if (inputStreamJson != null) {
- inputStream = inputStreamJson;
- summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
- + ".metainfo.json");
- log.info("Found JSON metainfo file in package");
- } else if (inputStreamXml != null) {
- inputStream = inputStreamXml;
- summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
- + ".metainfo.xml");
- log.info("Found XML metainfo file in package");
- }
- if (inputStream != null) {
- try (FSDataOutputStream dataOutputStream = sliderFileSystem
- .getFileSystem().create(summaryFileInFs, overwrite)) {
- log.info("Creating summary metainfo file");
- IOUtils.copy(inputStream, dataOutputStream);
- }
- }
- }
- }
-
- private int actionPackageInstall(ActionPackageArgs actionPackageArgs)
- throws YarnException, IOException {
- requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
-
- Path srcFile = extractPackagePath(actionPackageArgs.packageURI);
-
- Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
- actionPackageArgs.version);
- FileSystem fs = sliderFileSystem.getFileSystem();
- if (!fs.exists(pkgPath)) {
- fs.mkdirs(pkgPath);
- }
-
- Path fileInFs = new Path(pkgPath, srcFile.getName());
- require(actionPackageArgs.replacePkg || !fs.exists(fileInFs),
- E_PACKAGE_EXISTS +" at %s. Use --replacepkg to overwrite.", fileInFs.toUri());
-
- log.info("Installing package {} to {} (overwrite set to {})", srcFile,
- fileInFs, actionPackageArgs.replacePkg);
- fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs);
- createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg);
-
- String destPathWithHomeDir = Path
- .getPathWithoutSchemeAndAuthority(fileInFs).toString();
- String destHomeDir = Path.getPathWithoutSchemeAndAuthority(
- fs.getHomeDirectory()).toString();
- // a somewhat contrived approach to stripping out the home directory and any trailing
- // separator; designed to work on windows and unix
- String destPathWithoutHomeDir;
- if (destPathWithHomeDir.startsWith(destHomeDir)) {
- destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length());
- if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) {
- destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1);
- }
- } else {
- destPathWithoutHomeDir = destPathWithHomeDir;
- }
- log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}",
- destPathWithoutHomeDir);
-
- return EXIT_SUCCESS;
- }
-
- private Path extractPackagePath(String packageURI)
- throws BadCommandArgumentsException {
- require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION);
- File pkgFile = new File(packageURI);
- require(pkgFile.isFile(),
- E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ": " + pkgFile.getAbsolutePath());
- return new Path(pkgFile.toURI());
- }
-
- private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws
- YarnException, IOException {
- requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
-
- Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
- actionPackageArgs.version);
- FileSystem fs = sliderFileSystem.getFileSystem();
- require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri());
- log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath);
-
- if(fs.delete(pkgPath, true)) {
- log.info("Deleted package {} " + actionPackageArgs.name);
- return EXIT_SUCCESS;
- } else {
- log.warn("Package deletion failed.");
- return EXIT_NOT_FOUND;
- }
- }
-
- @Override
- public int actionUpdate(String clustername,
- AbstractClusterBuildingActionArgs buildInfo) throws
- YarnException, IOException {
- buildInstanceDefinition(clustername, buildInfo, true, true);
- return EXIT_SUCCESS;
- }
-
- /**
- * Build up the AggregateConfiguration for an application instance then
- * persists it
- * @param clustername name of the cluster
- * @param buildInfo the arguments needed to build the cluster
- * @param overwrite true if existing cluster directory can be overwritten
- * @param liveClusterAllowed true if live cluster can be modified
- * @throws YarnException
- * @throws IOException
- */
-
- public void buildInstanceDefinition(String clustername,
- AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
- boolean liveClusterAllowed) throws YarnException, IOException {
- buildInstanceDefinition(clustername, buildInfo, overwrite,
- liveClusterAllowed, false);
- }
-
- public void buildInstanceDefinition(String clustername,
- AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
- boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
- IOException {
- // verify that a live cluster isn't there
- validateClusterName(clustername);
- verifyBindingsDefined();
- if (!liveClusterAllowed) {
- verifyNoLiveClusters(clustername, "Create");
- }
-
- Configuration conf = getConfig();
- String registryQuorum = lookupZKQuorum();
-
- Path appconfdir = buildInfo.getConfdir();
- // Provider
- String providerName = buildInfo.getProvider();
- requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
- log.debug("Provider is {}", providerName);
- SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
- AbstractClientProvider provider =
- createClientProvider(providerName);
- InstanceBuilder builder =
- new InstanceBuilder(sliderFileSystem,
- getConfig(),
- clustername);
-
- AggregateConf instanceDefinition = new AggregateConf();
- ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
- ConfTreeOperations resources = instanceDefinition.getResourceOperations();
- ConfTreeOperations internal = instanceDefinition.getInternalOperations();
- //initial definition is set by the providers
- sliderAM.prepareInstanceConfiguration(instanceDefinition);
- provider.prepareInstanceConfiguration(instanceDefinition);
-
- //load in any specified on the command line
- if (buildInfo.resources != null) {
- try {
- resources.mergeFile(buildInfo.resources,
- new ResourcesInputPropertiesValidator());
-
- } catch (IOException e) {
- throw new BadConfigException(e,
- "incorrect argument to %s: \"%s\" : %s ",
- Arguments.ARG_RESOURCES,
- buildInfo.resources,
- e.toString());
- }
- }
- if (buildInfo.template != null) {
- try {
- appConf.mergeFile(buildInfo.template,
- new TemplateInputPropertiesValidator());
- } catch (IOException e) {
- throw new BadConfigException(e,
- "incorrect argument to %s: \"%s\" : %s ",
- Arguments.ARG_TEMPLATE,
- buildInfo.template,
- e.toString());
- }
- }
-
- if (isUpgradeFlow) {
- ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
- if (!upgradeInfo.force) {
- validateClientAndClusterResource(clustername, resources);
- }
- }
-
- //get the command line options
- ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
- ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
-
- appConf.merge(cmdLineAppOptions);
-
- AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
- appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
-
- // put the role counts into the resources file
- Map<String, String> argsRoleMap = buildInfo.getComponentMap();
- for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
- String count = roleEntry.getValue();
- String key = roleEntry.getKey();
- log.info("{} => {}", key, count);
- resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
- }
-
- //all CLI role options
- Map<String, Map<String, String>> appOptionMap =
- buildInfo.getCompOptionMap();
- appConf.mergeComponents(appOptionMap);
-
- //internal picks up core. values only
- internal.propagateGlobalKeys(appConf, "slider.");
- internal.propagateGlobalKeys(appConf, "internal.");
-
- //copy over role. and yarn. values ONLY to the resources
- if (PROPAGATE_RESOURCE_OPTION) {
- resources.propagateGlobalKeys(appConf, "component.");
- resources.propagateGlobalKeys(appConf, "role.");
- resources.propagateGlobalKeys(appConf, "yarn.");
- resources.mergeComponentsPrefix(appOptionMap, "component.", true);
- resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
- resources.mergeComponentsPrefix(appOptionMap, "role.", true);
- }
-
- // resource component args
- appConf.merge(cmdLineResourceOptions);
- resources.merge(cmdLineResourceOptions);
- resources.mergeComponents(buildInfo.getResourceCompOptionMap());
-
- builder.init(providerName, instanceDefinition);
- builder.resolve();
- builder.propagateFilename();
- builder.propagatePrincipals();
- builder.setImageDetailsIfAvailable(buildInfo.getImage(),
- buildInfo.getAppHomeDir());
- builder.setQueue(buildInfo.queue);
-
- String quorum = buildInfo.getZKhosts();
- if (isUnset(quorum)) {
- quorum = registryQuorum;
- }
- if (isUnset(quorum)) {
- throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
- }
- ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
- getUsername(),
- clustername,
- registryQuorum,
- quorum);
- String zookeeperRoot = buildInfo.getAppZKPath();
-
- if (isSet(zookeeperRoot)) {
- zkPaths.setAppPath(zookeeperRoot);
- } else {
- String createDefaultZkNode = appConf.getGlobalOptions()
- .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
- if (createDefaultZkNode.equals("true")) {
- String defaultZKPath = createZookeeperNode(clustername, false);
- log.debug("ZK node created for application instance: {}", defaultZKPath);
- if (defaultZKPath != null) {
- zkPaths.setAppPath(defaultZKPath);
- }
- } else {
- // create AppPath if default is being used
- String defaultZKPath = createZookeeperNode(clustername, true);
- log.debug("ZK node assigned to application instance: {}", defaultZKPath);
- zkPaths.setAppPath(defaultZKPath);
- }
- }
-
- builder.addZKBinding(zkPaths);
-
- //then propagate any package URI
- if (buildInfo.packageURI != null) {
- appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
- }
-
- propagatePythonExecutable(conf, instanceDefinition);
-
- // make any substitutions needed at this stage
- replaceTokens(appConf.getConfTree(), getUsername(), clustername);
-
- // TODO: Refactor the validation code and persistence code
- try {
- persistInstanceDefinition(overwrite, appconfdir, builder);
- appDefinitionPersister.persistPackages();
-
- } catch (LockAcquireFailedException e) {
- log.warn("Failed to get a Lock on {} : {}", builder, e, e);
- throw new BadClusterStateException("Failed to save " + clustername
- + ": " + e);
- }
-
- // providers to validate what there is
- // TODO: Validation should be done before persistence
- AggregateConf instanceDescription = builder.getInstanceDescription();
- validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
- validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
- }
-
- private void validateClientAndClusterResource(String clustername,
- ConfTreeOperations clientResources) throws BadClusterStateException,
- SliderException, IOException {
- log.info("Validating upgrade resource definition with current cluster "
- + "state (components and instance count)");
- Map<String, Integer> clientComponentInstances = new HashMap<>();
- for (String componentName : clientResources.getComponentNames()) {
- if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
- clientComponentInstances.put(componentName, clientResources
- .getComponentOptInt(componentName,
- COMPONENT_INSTANCES, -1));
- }
- }
-
- AggregateConf clusterConf = null;
- try {
- clusterConf = loadPersistedClusterDescription(clustername);
- } catch (LockAcquireFailedException e) {
- log.warn("Failed to get a Lock on cluster resource : {}", e, e);
- throw new BadClusterStateException(
- "Failed to load client resource definition " + clustername + ": " + e, e);
- }
- Map<String, Integer> clusterComponentInstances = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> component : clusterConf
- .getResources().components.entrySet()) {
- if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
- clusterComponentInstances.put(
- component.getKey(),
- Integer.decode(component.getValue().get(
- COMPONENT_INSTANCES)));
- }
- }
-
- // client and cluster should be an exact match
- Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
- .entrySet().iterator();
- while (clientComponentInstanceIt.hasNext()) {
- Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next();
- if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
- // compare instance count now and remove from both maps if they match
- if (clusterComponentInstances
- .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry
- .getValue().intValue()) {
- clusterComponentInstances.remove(clientComponentInstanceEntry
- .getKey());
- clientComponentInstanceIt.remove();
- }
- }
- }
-
- if (!clientComponentInstances.isEmpty()
- || !clusterComponentInstances.isEmpty()) {
- log.error("Mismatch found in upgrade resource definition and cluster "
- + "resource state");
- if (!clientComponentInstances.isEmpty()) {
- log.info("The upgrade resource definitions that do not match are:");
- for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
- .entrySet()) {
- log.info(" Component Name: {}, Instance count: {}",
- clientComponentInstanceEntry.getKey(),
- clientComponentInstanceEntry.getValue());
- }
- }
- if (!clusterComponentInstances.isEmpty()) {
- log.info("The cluster resources that do not match are:");
- for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
- .entrySet()) {
- log.info(" Component Name: {}, Instance count: {}",
- clusterComponentInstanceEntry.getKey(),
- clusterComponentInstanceEntry.getValue());
- }
- }
- throw new BadConfigException("Resource definition provided for "
- + "upgrade does not match with that of the currently running "
- + "cluster.\nIf you are aware of what you are doing, rerun the "
- + "command with " + Arguments.ARG_FORCE + " option.");
- }
- }
-
- protected void persistInstanceDefinition(boolean overwrite,
- Path appconfdir,
- InstanceBuilder builder)
- throws IOException, SliderException, LockAcquireFailedException {
- builder.persist(appconfdir, overwrite);
- }
-
- @VisibleForTesting
- public static void replaceTokens(ConfTree conf,
- String userName, String clusterName) throws IOException {
- Map<String,String> newglobal = new HashMap<>();
- for (Entry<String,String> entry : conf.global.entrySet()) {
- newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
- userName, clusterName));
- }
- conf.global.putAll(newglobal);
-
- for (String component : conf.components.keySet()) {
- Map<String,String> newComponent = new HashMap<>();
- for (Entry<String,String> entry : conf.components.get(component).entrySet()) {
- newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
- userName, clusterName));
- }
- conf.components.get(component).putAll(newComponent);
- }
-
- Map<String,List<String>> newcred = new HashMap<>();
- for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
- List<String> resultList = new ArrayList<>();
- for (String v : entry.getValue()) {
- resultList.add(replaceTokens(v, userName, clusterName));
- }
- newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
- resultList);
- }
- conf.credentials.clear();
- conf.credentials.putAll(newcred);
- }
-
- private static String replaceTokens(String s, String userName,
- String clusterName) throws IOException {
- return s.replaceAll(Pattern.quote("${USER}"), userName)
- .replaceAll(Pattern.quote("${USER_NAME}"), userName);
- }
-
- public FsPermission getClusterDirectoryPermissions(Configuration conf) {
- String clusterDirPermsOct =
- conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
- return new FsPermission(clusterDirPermsOct);
- }
-
- /**
- * Verify that the Resource Manager is configured (on a non-HA cluster).
- * with a useful error message
- * @throws BadCommandArgumentsException the exception raised on an invalid config
- */
- public void verifyBindingsDefined() throws BadCommandArgumentsException {
- InetSocketAddress rmAddr = getRmAddress(getConfig());
- if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
- && !isAddressDefined(rmAddr)) {
- throw new BadCommandArgumentsException(
- E_NO_RESOURCE_MANAGER
- + " in the argument "
- + Arguments.ARG_MANAGER
- + " or the configuration property "
- + YarnConfiguration.RM_ADDRESS
- + " value :" + rmAddr);
- }
- }
-
- /**
- * Load and start a cluster specification.
- * This assumes that all validation of args and cluster state
- * have already taken place
- *
- * @param clustername name of the cluster.
- * @param launchArgs launch arguments
- * @return the exit code
- * @throws YarnException
- * @throws IOException
- */
- protected int startCluster(String clustername,
- LaunchArgsAccessor launchArgs) throws
- YarnException,
- IOException {
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- clustername,
- clusterDirectory);
-
- LaunchedApplication launchedApplication =
- launchApplication(clustername, clusterDirectory, instanceDefinition,
- serviceArgs.isDebug());
-
- if (launchArgs.getOutputFile() != null) {
- // output file has been requested. Get the app report and serialize it
- ApplicationReport report =
- launchedApplication.getApplicationReport();
- SerializedApplicationReport sar = new SerializedApplicationReport(report);
- sar.submitTime = System.currentTimeMillis();
- ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
- serDeser.save(sar, launchArgs.getOutputFile());
- }
- int waittime = launchArgs.getWaittime();
- if (waittime > 0) {
- return waitForAppRunning(launchedApplication, waittime, waittime);
- } else {
- // no waiting
- return EXIT_SUCCESS;
- }
- }
-
- /**
- * Load the instance definition. It is not resolved at this point
- * @param name cluster name
- * @param clusterDirectory cluster dir
- * @return the loaded configuration
- * @throws IOException
- * @throws SliderException
- * @throws UnknownApplicationInstanceException if the file is not found
- */
- public AggregateConf loadInstanceDefinitionUnresolved(String name,
- Path clusterDirectory) throws IOException, SliderException {
-
- try {
- AggregateConf definition =
- InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
- clusterDirectory);
- definition.setName(name);
- return definition;
- } catch (FileNotFoundException e) {
- throw UnknownApplicationInstanceException.unknownInstance(name, e);
- }
- }
-
- /**
- * Load the instance definition.
- * @param name cluster name
- * @param resolved flag to indicate the cluster should be resolved
- * @return the loaded configuration
- * @throws IOException IO problems
- * @throws SliderException slider explicit issues
- * @throws UnknownApplicationInstanceException if the file is not found
- */
- public AggregateConf loadInstanceDefinition(String name,
- boolean resolved) throws
- IOException,
- SliderException {
-
- Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
- AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
- name,
- clusterDirectory);
- if (resolved) {
- instanceDefinition.resolve();
- }
- return instanceDefinition;
-
- }
-
- protected AppMasterLauncher setupAppMasterLauncher(String clustername,
- Path clusterDirectory,
- AggregateConf instanceDefinition,
- boolean debugAM)
- throws YarnException, IOException{
- deployedClusterName = clustername;
- validateClusterName(clustername);
- verifyNoLiveClusters(clustername, "Launch");
- Configuration config = getConfig();
- lookupZKQuorum();
- boolean clusterSecure = isHadoopClusterSecure(config);
- //create the Slider AM provider -this helps set up the AM
- SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
-
- instanceDefinition.resolve();
- launchedInstanceDefinition = instanceDefinition;
-
- ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations();
- MapOperations internalOptions = internalOperations.getGlobalOptions();
- ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations();
- ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations();
- Path generatedConfDirPath =
- createPathThatMustExist(internalOptions.getMandatoryOption(
- INTERNAL_GENERATED_CONF_PATH));
- Path snapshotConfPath =
- createPathThatMustExist(internalOptions.getMandatoryOption(
- INTERNAL_SNAPSHOT_CONF_PATH));
-
-
- // cluster Provider
- AbstractClientProvider provider = createClientProvider(
- internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
- if (log.isDebugEnabled()) {
- log.debug(instanceDefinition.toString());
- }
- MapOperations sliderAMResourceComponent =
- resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
- MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
-
- // add the tags if available
- Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
- getApplicationDefinitionPath(appOperations));
-
- Credentials credentials = null;
- if (clusterSecure) {
- // pick up oozie credentials
- credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
- config);
- if (credentials == null) {
- // nothing from oozie, so build up directly
- credentials = new Credentials(
- UserGroupInformation.getCurrentUser().getCredentials());
- CredentialUtils.addRMRenewableFSDelegationTokens(config,
- sliderFileSystem.getFileSystem(),
- credentials);
- CredentialUtils.addRMDelegationToken(yarnClient, credentials);
-
- } else {
- log.info("Using externally supplied credentials to launch AM");
- }
- }
-
- AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
- SliderKeys.APP_TYPE,
- config,
- sliderFileSystem,
- yarnClient,
- clusterSecure,
- sliderAMResourceComponent,
- resourceGlobalOptions,
- applicationTags,
- credentials);
-
- ApplicationId appId = amLauncher.getApplicationId();
- // set the application name;
- amLauncher.setKeepContainersOverRestarts(true);
-
- int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
- amLauncher.setMaxAppAttempts(maxAppAttempts);
-
- sliderFileSystem.purgeAppInstanceTempFiles(clustername);
- Path tempPath = sliderFileSystem.createAppInstanceTempPath(
- clustername,
- appId.toString() + "/am");
- String libdir = "lib";
- Path libPath = new Path(tempPath, libdir);
- sliderFileSystem.getFileSystem().mkdirs(libPath);
- log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath);
-
- // set local resources for the application master
- // local files or archives as needed
- // In this scenario, the jar file for the application master is part of the local resources
- Map<String, LocalResource> localResources = amLauncher.getLocalResources();
-
- // look for the configuration directory named on the command line
- boolean hasServerLog4jProperties = false;
- Path remoteConfPath = null;
- String relativeConfDir = null;
- String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
- if (isUnset(confdirProp)) {
- log.debug("No local configuration directory provided as system property");
- } else {
- File confDir = new File(confdirProp);
- if (!confDir.exists()) {
- throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
- confDir);
- }
- Path localConfDirPath = createLocalPath(confDir);
- remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
- log.debug("Slider configuration directory is {}; remote to be {}",
- localConfDirPath, remoteConfPath);
- copyDirectory(config, localConfDirPath, remoteConfPath, null);
-
- File log4jserver =
- new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
- hasServerLog4jProperties = log4jserver.isFile();
- }
- // the assumption here is that minimr cluster => this is a test run
- // and the classpath can look after itself
-
- boolean usingMiniMRCluster = getUsingMiniMRCluster();
- if (!usingMiniMRCluster) {
-
- log.debug("Destination is not a MiniYARNCluster -copying full classpath");
-
- // insert conf dir first
- if (remoteConfPath != null) {
- relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
- Map<String, LocalResource> submittedConfDir =
- sliderFileSystem.submitDirectory(remoteConfPath,
- relativeConfDir);
- mergeMaps(localResources, submittedConfDir);
- }
- }
- // build up the configuration
- // IMPORTANT: it is only after this call that site configurations
- // will be valid.
-
- propagatePrincipals(config, instanceDefinition);
- // validate security data
-
-/*
- // turned off until tested
- SecurityConfiguration securityConfiguration =
- new SecurityConfiguration(config,
- instanceDefinition, clustername);
-
-*/
- Configuration clientConfExtras = new Configuration(false);
- // then build up the generated path.
- FsPermission clusterPerms = getClusterDirectoryPermissions(config);
- copyDirectory(config, snapshotConfPath, generatedConfDirPath,
- clusterPerms);
-
-
- // standard AM resources
- sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
- config,
- amLauncher,
- instanceDefinition,
-
<TRUNCATED>