You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:32 UTC
[32/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
new file mode 100644
index 0000000..8210f4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -0,0 +1,4569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.slider.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.KerberosDiags;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.SliderApplicationApi;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.StateValues;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.types.ContainerInformation;
+import org.apache.slider.api.types.NodeInformationList;
+import org.apache.slider.api.types.SliderInstanceDescription;
+import org.apache.slider.client.ipc.SliderApplicationIpcClient;
+import org.apache.slider.client.ipc.SliderClusterOperations;
+import org.apache.slider.common.Constants;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.params.ActionAMSuicideArgs;
+import org.apache.slider.common.params.ActionClientArgs;
+import org.apache.slider.common.params.ActionCreateArgs;
+import org.apache.slider.common.params.ActionDependencyArgs;
+import org.apache.slider.common.params.ActionDestroyArgs;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
+import org.apache.slider.common.params.ActionEchoArgs;
+import org.apache.slider.common.params.ActionExistsArgs;
+import org.apache.slider.common.params.ActionFlexArgs;
+import org.apache.slider.common.params.ActionFreezeArgs;
+import org.apache.slider.common.params.ActionInstallKeytabArgs;
+import org.apache.slider.common.params.ActionInstallPackageArgs;
+import org.apache.slider.common.params.ActionKDiagArgs;
+import org.apache.slider.common.params.ActionKeytabArgs;
+import org.apache.slider.common.params.ActionKillContainerArgs;
+import org.apache.slider.common.params.ActionListArgs;
+import org.apache.slider.common.params.ActionLookupArgs;
+import org.apache.slider.common.params.ActionNodesArgs;
+import org.apache.slider.common.params.ActionPackageArgs;
+import org.apache.slider.common.params.ActionRegistryArgs;
+import org.apache.slider.common.params.ActionResolveArgs;
+import org.apache.slider.common.params.ActionResourceArgs;
+import org.apache.slider.common.params.ActionStatusArgs;
+import org.apache.slider.common.params.ActionThawArgs;
+import org.apache.slider.common.params.ActionTokensArgs;
+import org.apache.slider.common.params.ActionUpgradeArgs;
+import org.apache.slider.common.params.Arguments;
+import org.apache.slider.common.params.ClientArgs;
+import org.apache.slider.common.params.CommonArgs;
+import org.apache.slider.common.params.LaunchArgsAccessor;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceBuilder;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
+import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.NotFoundException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.exceptions.UsageException;
+import org.apache.slider.core.exceptions.WaitTimeoutException;
+import org.apache.slider.core.launch.AppMasterLauncher;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CredentialUtils;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.launch.LaunchedApplication;
+import org.apache.slider.core.launch.RunningApplication;
+import org.apache.slider.core.launch.SerializedApplicationReport;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.persist.AppDefinitionPersister;
+import org.apache.slider.core.persist.ApplicationReportSerDeser;
+import org.apache.slider.core.persist.ConfPersister;
+import org.apache.slider.core.persist.JsonSerDeser;
+import org.apache.slider.core.persist.LockAcquireFailedException;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.registry.YarnAppListClient;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
+import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.services.security.SecurityStore;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
+import static org.apache.slider.api.InternalKeys.*;
+import static org.apache.slider.api.OptionKeys.*;
+import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
+import static org.apache.slider.common.params.SliderActions.*;
+import static org.apache.slider.common.tools.SliderUtils.*;
+
+
+/**
+ * Client service for Slider
+ */
+
+public class SliderClient extends AbstractSliderLaunchedService implements RunService,
+ SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
+ private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
+ public static final String E_MUST_BE_A_VALID_JSON_FILE
+ = "Invalid configuration. Must be a valid json file.";
+ public static final String E_INVALID_INSTALL_LOCATION
+ = "A valid install location must be provided for the client.";
+ public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
+ = "Unable to read supplied package file";
+ public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION
+ = "A valid application package location required.";
+ public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory";
+ public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist";
+ public static final String E_INVALID_APPLICATION_TYPE_NAME
+ = "A valid application type name is required (e.g. HBASE).";
+ public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite.";
+ public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist";
+ public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined";
+ public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
+ public static final String E_PACKAGE_EXISTS = "Package exists";
+ private static PrintStream clientOutputStream = System.out;
+
+ // value should not be changed without updating string find in slider.py
+ private static final String PASSWORD_PROMPT = "Enter password for";
+
+ private ClientArgs serviceArgs;
+ public ApplicationId applicationId;
+
+ private String deployedClusterName;
+ /**
+ * Cluster operations against the deployed cluster -will be null
+ * if no bonding has yet taken place
+ */
+ private SliderClusterOperations sliderClusterOperations;
+
+ protected SliderFileSystem sliderFileSystem;
+
+ /**
+ * Yarn client service
+ */
+ private SliderYarnClientImpl yarnClient;
+ private YarnAppListClient yarnAppListClient;
+ private AggregateConf launchedInstanceDefinition;
+
+ /**
+ * The YARN registry service
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private RegistryOperations registryOperations;
+
+ /**
+ * Constructor
+ */
+ public SliderClient() {
+ super("Slider Client");
+ new HdfsConfiguration();
+ new YarnConfiguration();
+ }
+
+ /**
+ * This is called <i>Before serviceInit is called</i>
+ * @param config the initial configuration build up by the
+ * service launcher.
+ * @param args argument list list of arguments passed to the command line
+ * after any launcher-specific commands have been stripped.
+ * @return the post-binding configuration to pass to the <code>init()</code>
+ * operation.
+ * @throws Exception
+ */
+ @Override
+ public Configuration bindArgs(Configuration config, String... args) throws Exception {
+ config = super.bindArgs(config, args);
+ serviceArgs = new ClientArgs(args);
+ serviceArgs.parse();
+ // add the slider XML config
+ ConfigHelper.injectSliderXMLResource();
+ // yarn-ify
+ YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
+ return patchConfiguration(yarnConfiguration);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ Configuration clientConf = loadSliderClientXML();
+ ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true);
+ serviceArgs.applyDefinitions(conf);
+ serviceArgs.applyFileSystemBinding(conf);
+ AbstractActionArgs coreAction = serviceArgs.getCoreAction();
+ // init security with our conf
+ if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) {
+ forceLogin();
+ initProcessSecurity(conf);
+ }
+ if (coreAction.getHadoopServicesRequired()) {
+ initHadoopBinding();
+ }
+ super.serviceInit(conf);
+ }
+
+ /**
+ * Launched service execution. This runs {@link #exec()}
+ * then catches some exceptions and converts them to exit codes
+ * @return an exit code
+ * @throws Throwable
+ */
+ @Override
+ public int runService() throws Throwable {
+ try {
+ return exec();
+ } catch (FileNotFoundException | PathNotFoundException nfe) {
+ throw new NotFoundException(nfe, nfe.toString());
+ }
+ }
+
+ /**
+ * Execute the command line
+ * @return an exit code
+ * @throws Throwable on a failure
+ */
+ public int exec() throws Throwable {
+
+ // choose the action
+ String action = serviceArgs.getAction();
+ if (isUnset(action)) {
+ throw new SliderException(EXIT_USAGE, serviceArgs.usage());
+ }
+
+ int exitCode = EXIT_SUCCESS;
+ String clusterName = serviceArgs.getClusterName();
+ // actions
+
+ switch (action) {
+ case ACTION_AM_SUICIDE:
+ exitCode = actionAmSuicide(clusterName,
+ serviceArgs.getActionAMSuicideArgs());
+ break;
+
+ case ACTION_BUILD:
+ exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+ break;
+
+ case ACTION_CLIENT:
+ exitCode = actionClient(serviceArgs.getActionClientArgs());
+ break;
+
+ case ACTION_CREATE:
+ exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+ break;
+
+ case ACTION_DEPENDENCY:
+ exitCode = actionDependency(serviceArgs.getActionDependencyArgs());
+ break;
+
+ case ACTION_DESTROY:
+ exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs());
+ break;
+
+ case ACTION_DIAGNOSTICS:
+ exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
+ break;
+
+ case ACTION_EXISTS:
+ exitCode = actionExists(clusterName,
+ serviceArgs.getActionExistsArgs());
+ break;
+
+ case ACTION_FLEX:
+ exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+ break;
+
+ case ACTION_FREEZE:
+ exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+ break;
+
+ case ACTION_HELP:
+ log.info(serviceArgs.usage());
+ break;
+
+ case ACTION_KDIAG:
+ exitCode = actionKDiag(serviceArgs.getActionKDiagArgs());
+ break;
+
+ case ACTION_KILL_CONTAINER:
+ exitCode = actionKillContainer(clusterName,
+ serviceArgs.getActionKillContainerArgs());
+ break;
+
+ case ACTION_INSTALL_KEYTAB:
+ exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
+ break;
+
+ case ACTION_INSTALL_PACKAGE:
+ exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
+ break;
+
+ case ACTION_KEYTAB:
+ exitCode = actionKeytab(serviceArgs.getActionKeytabArgs());
+ break;
+
+ case ACTION_LIST:
+ exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
+ break;
+
+ case ACTION_LOOKUP:
+ exitCode = actionLookup(serviceArgs.getActionLookupArgs());
+ break;
+
+ case ACTION_NODES:
+ exitCode = actionNodes("", serviceArgs.getActionNodesArgs());
+ break;
+
+ case ACTION_PACKAGE:
+ exitCode = actionPackage(serviceArgs.getActionPackageArgs());
+ break;
+
+ case ACTION_REGISTRY:
+ exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
+ break;
+
+ case ACTION_RESOLVE:
+ exitCode = actionResolve(serviceArgs.getActionResolveArgs());
+ break;
+
+ case ACTION_RESOURCE:
+ exitCode = actionResource(serviceArgs.getActionResourceArgs());
+ break;
+
+ case ACTION_STATUS:
+ exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
+ break;
+
+ case ACTION_THAW:
+ exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+ break;
+
+ case ACTION_TOKENS:
+ exitCode = actionTokens(serviceArgs.getActionTokenArgs());
+ break;
+
+ case ACTION_UPDATE:
+ exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+ break;
+
+ case ACTION_UPGRADE:
+ exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs());
+ break;
+
+ case ACTION_VERSION:
+ exitCode = actionVersion();
+ break;
+
+ default:
+ throw new SliderException(EXIT_UNIMPLEMENTED,
+ "Unimplemented: " + action);
+ }
+
+ return exitCode;
+ }
+
+ /**
+ * Perform everything needed to init the hadoop binding.
+ * This assumes that the service is already in inited or started state
+ * @throws IOException
+ * @throws SliderException
+ */
+ protected void initHadoopBinding() throws IOException, SliderException {
+ // validate the client
+ validateSliderClientEnvironment(null);
+ //create the YARN client
+ yarnClient = new SliderYarnClientImpl();
+ yarnClient.init(getConfig());
+ if (getServiceState() == STATE.STARTED) {
+ yarnClient.start();
+ }
+ addService(yarnClient);
+ yarnAppListClient =
+ new YarnAppListClient(yarnClient, getUsername(), getConfig());
+ // create the filesystem
+ sliderFileSystem = new SliderFileSystem(getConfig());
+ }
+
+ /**
+ * Delete the zookeeper node associated with the calling user and the cluster
+ * TODO: YARN registry operations
+ **/
+ @VisibleForTesting
+ public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ Exception e = null;
+ try {
+ Configuration config = getConfig();
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ if (client.exists(zkPath)) {
+ log.info("Deleting zookeeper path {}", zkPath);
+ }
+ client.deleteRecursive(zkPath);
+ return true;
+ }
+ } catch (InterruptedException | BadConfigException | KeeperException ex) {
+ e = ex;
+ }
+ if (e != null) {
+ log.warn("Unable to recursively delete zk node {}", zkPath, e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Create the zookeeper node associated with the calling user and the cluster
+ *
+ * @param clusterName slider application name
+ * @param nameOnly should the name only be created (i.e. don't create ZK node)
+ * @return the path, using the policy implemented in
+ * {@link ZKIntegration#mkClusterPath(String, String)}
+ * @throws YarnException
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+ try {
+ return createZookeeperNodeInner(clusterName, nameOnly);
+ } catch (KeeperException.NodeExistsException e) {
+ return null;
+ } catch (KeeperException e) {
+ return null;
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+ }
+
+ /**
+ * Create the zookeeper node associated with the calling user and the cluster
+ * -throwing exceptions on any failure
+ * @param clusterName cluster name
+ * @param nameOnly create the path, not the node
+ * @return the path, using the policy implemented in
+ * {@link ZKIntegration#mkClusterPath(String, String)}
+ * @throws YarnException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ public String createZookeeperNodeInner(String clusterName, Boolean nameOnly)
+ throws YarnException, IOException, KeeperException, InterruptedException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ if (nameOnly) {
+ return zkPath;
+ }
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ // set up the permissions. This must be done differently on a secure cluster from an insecure
+ // one
+ List<ACL> zkperms = new ArrayList<>();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+ zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ } else {
+ zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ }
+ client.createPath(zkPath, "",
+ zkperms,
+ CreateMode.PERSISTENT);
+ return zkPath;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+ **/
+ protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+ String registryQuorum = lookupZKQuorum();
+ ZKIntegration client = null;
+ try {
+ BlockingZKWatcher watcher = new BlockingZKWatcher();
+ client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher,
+ ZKIntegration.SESSION_TIMEOUT);
+ client.init();
+ watcher.waitForZKConnection(2 * 1000);
+ } catch (InterruptedException e) {
+ client = null;
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ } catch (IOException e) {
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ }
+ return client;
+ }
+
+ /**
+ * Keep this signature for backward compatibility with
+ * force=true by default.
+ */
+ @Override
+ public int actionDestroy(String clustername) throws YarnException,
+ IOException {
+ ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
+ destroyArgs.force = true;
+ return actionDestroy(clustername, destroyArgs);
+ }
+
+ @Override
+ public int actionDestroy(String clustername,
+ ActionDestroyArgs destroyArgs) throws YarnException, IOException {
+ // verify that a live cluster isn't there
+ validateClusterName(clustername);
+ //no=op, it is now mandatory.
+ verifyBindingsDefined();
+ verifyNoLiveClusters(clustername, "Destroy");
+ boolean forceDestroy = destroyArgs.force;
+ log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
+
+ // create the directory path
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ // delete the directory;
+ FileSystem fs = sliderFileSystem.getFileSystem();
+ boolean exists = fs.exists(clusterDirectory);
+ if (exists) {
+ log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
+ if (!forceDestroy) {
+ // fail the command if --force is not explicitly specified
+ throw new UsageException("Destroy will permanently delete directories and registries. "
+ + "Reissue this command with the --force option if you want to proceed.");
+ }
+ if (!fs.delete(clusterDirectory, true)) {
+ log.warn("Filesystem returned false from delete() operation");
+ }
+
+ if(!deleteZookeeperNode(clustername)) {
+ log.warn("Unable to perform node cleanup in Zookeeper.");
+ }
+
+ if (fs.exists(clusterDirectory)) {
+ log.warn("Failed to delete {}", clusterDirectory);
+ }
+
+ } else {
+ log.debug("Application Instance {} already destroyed", clustername);
+ }
+
+ // rm the registry entry \u2014do not let this block the destroy operations
+ String registryPath = SliderRegistryUtils.registryPathForInstance(
+ clustername);
+ try {
+ getRegistryOperations().delete(registryPath, true);
+ } catch (IOException e) {
+ log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
+ } catch (SliderException e) {
+ log.warn("Error binding to registry {} ", e, e);
+ }
+
+ List<ApplicationReport> instances = findAllLiveInstances(clustername);
+ // detect any race leading to cluster creation during the check/destroy process
+ // and report a problem.
+ if (!instances.isEmpty()) {
+ throw new SliderException(EXIT_APPLICATION_IN_USE,
+ clustername + ": "
+ + E_DESTROY_CREATE_RACE_CONDITION
+ + " :" +
+ instances.get(0));
+ }
+ log.info("Destroyed cluster {}", clustername);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionAmSuicide(String clustername,
+ ActionAMSuicideArgs args) throws YarnException, IOException {
+ SliderClusterOperations clusterOperations =
+ createClusterOperations(clustername);
+ clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public AbstractClientProvider createClientProvider(String provider)
+ throws SliderException {
+ SliderProviderFactory factory =
+ SliderProviderFactory.createSliderProviderFactory(provider);
+ return factory.createClientProvider();
+ }
+
+ /**
+ * Create the cluster -saving the arguments to a specification file first
+ * @param clustername cluster name
+ * @return the status code
+ * @throws YarnException Yarn problems
+ * @throws IOException other problems
+ * @throws BadCommandArgumentsException bad arguments.
+ */
+ public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
+ YarnException,
+ IOException {
+
+ actionBuild(clustername, createArgs);
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clustername, clusterDirectory);
+ try {
+ checkForCredentials(getConfig(), instanceDefinition.getAppConf());
+ } catch (IOException e) {
+ sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
+ throw e;
+ }
+ return startCluster(clustername, createArgs);
+ }
+
+ @Override
+ public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
+ throws YarnException, IOException {
+ File template = upgradeArgs.template;
+ File resources = upgradeArgs.resources;
+ List<String> containers = upgradeArgs.containers;
+ List<String> components = upgradeArgs.components;
+
+ // For upgrade spec, let's be little more strict with validation. If either
+ // --template or --resources is specified, then both needs to be specified.
+ // Otherwise the internal app config and resources states of the app will be
+ // unwantedly modified and the change will take effect to the running app
+ // immediately.
+ require(!(template != null && resources == null),
+ "Option %s must be specified with option %s",
+ Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
+
+ require(!(resources != null && template == null),
+ "Option %s must be specified with option %s",
+ Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
+
+ // For upgrade spec, both --template and --resources should be specified
+ // and neither of --containers or --components should be used
+ if (template != null && resources != null) {
+ require(CollectionUtils.isEmpty(containers),
+ "Option %s cannot be specified with %s or %s",
+ Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
+ Arguments.ARG_RESOURCES);
+ require(CollectionUtils.isEmpty(components),
+ "Option %s cannot be specified with %s or %s",
+ Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
+ Arguments.ARG_RESOURCES);
+
+ // not an error to try to upgrade a stopped cluster, just return success
+ // code, appropriate log messages have already been dumped
+ if (!isAppInRunningState(clustername)) {
+ return EXIT_SUCCESS;
+ }
+
+ // Now initiate the upgrade spec flow
+ buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
+ SliderClusterOperations clusterOperations = createClusterOperations(clustername);
+ clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
+ return EXIT_SUCCESS;
+ }
+
+ // Since neither --template or --resources were specified, it is upgrade
+ // containers flow. Here any one or both of --containers and --components
+ // can be specified. If a container is specified with --containers option
+ // and also belongs to a component type specified with --components, it will
+ // be upgraded only once.
+ return actionUpgradeContainers(clustername, upgradeArgs);
+ }
+
+ private int actionUpgradeContainers(String clustername,
+ ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
+ verifyBindingsDefined();
+ validateClusterName(clustername);
+ int waittime = upgradeArgs.getWaittime(); // ignored for now
+ String text = "Upgrade containers";
+ log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
+ text, waittime);
+
+ // not an error to try to upgrade a stopped cluster, just return success
+ // code, appropriate log messages have already been dumped
+ if (!isAppInRunningState(clustername)) {
+ return EXIT_SUCCESS;
+ }
+
+ // Create sets of containers and components to get rid of duplicates and
+ // for quick lookup during checks below
+ Set<String> containers = new HashSet<>();
+ if (upgradeArgs.containers != null) {
+ containers.addAll(new ArrayList<>(upgradeArgs.containers));
+ }
+ Set<String> components = new HashSet<>();
+ if (upgradeArgs.components != null) {
+ components.addAll(new ArrayList<>(upgradeArgs.components));
+ }
+
+ // check validity of component names and running containers here
+ List<ContainerInformation> liveContainers = getContainers(clustername);
+ Set<String> validContainers = new HashSet<>();
+ Set<String> validComponents = new HashSet<>();
+ for (ContainerInformation liveContainer : liveContainers) {
+ boolean allContainersAndComponentsAccountedFor = true;
+ if (CollectionUtils.isNotEmpty(containers)) {
+ if (containers.contains(liveContainer.containerId)) {
+ containers.remove(liveContainer.containerId);
+ validContainers.add(liveContainer.containerId);
+ }
+ allContainersAndComponentsAccountedFor = false;
+ }
+ if (CollectionUtils.isNotEmpty(components)) {
+ if (components.contains(liveContainer.component)) {
+ components.remove(liveContainer.component);
+ validComponents.add(liveContainer.component);
+ }
+ allContainersAndComponentsAccountedFor = false;
+ }
+ if (allContainersAndComponentsAccountedFor) {
+ break;
+ }
+ }
+
+ // If any item remains in containers or components then they are invalid.
+ // Log warning for them and proceed.
+ if (CollectionUtils.isNotEmpty(containers)) {
+ log.warn("Invalid set of containers provided {}", containers);
+ }
+ if (CollectionUtils.isNotEmpty(components)) {
+ log.warn("Invalid set of components provided {}", components);
+ }
+
+ // If not a single valid container or component is specified do not proceed
+ if (CollectionUtils.isEmpty(validContainers)
+ && CollectionUtils.isEmpty(validComponents)) {
+ log.error("Not a single valid container or component specified. Nothing to do.");
+ return EXIT_NOT_FOUND;
+ }
+
+ SliderClusterProtocol appMaster = connect(findInstance(clustername));
+ Messages.UpgradeContainersRequestProto r =
+ Messages.UpgradeContainersRequestProto
+ .newBuilder()
+ .setMessage(text)
+ .addAllContainer(validContainers)
+ .addAllComponent(validComponents)
+ .build();
+ appMaster.upgradeContainers(r);
+ log.info("Cluster upgrade issued for -");
+ if (CollectionUtils.isNotEmpty(validContainers)) {
+ log.info(" Containers (total {}): {}", validContainers.size(),
+ validContainers);
+ }
+ if (CollectionUtils.isNotEmpty(validComponents)) {
+ log.info(" Components (total {}): {}", validComponents.size(),
+ validComponents);
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ // returns true if and only if app is in RUNNING state
+ private boolean isAppInRunningState(String clustername) throws YarnException,
+ IOException {
+ // is this actually a known cluster?
+ sliderFileSystem.locateInstanceDefinition(clustername);
+ ApplicationReport app = findInstance(clustername);
+ if (app == null) {
+ // exit early
+ log.info("Cluster {} not running", clustername);
+ return false;
+ }
+ log.debug("App to upgrade was found: {}:\n{}", clustername,
+ new OnDemandReportStringifier(app));
+ if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
+ log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.",
+ clustername, app.getYarnApplicationState(), ACTION_UPDATE);
+ return false;
+ }
+
+ // IPC request to upgrade containers is possible if the app is running.
+ if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
+ .ordinal()) {
+ log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
+ + "to be RUNNING.", clustername, app.getYarnApplicationState());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected static void checkForCredentials(Configuration conf,
+ ConfTree tree) throws IOException {
+ if (tree.credentials == null || tree.credentials.isEmpty()) {
+ log.info("No credentials requested");
+ return;
+ }
+
+ BufferedReader br = null;
+ try {
+ for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
+ String provider = cred.getKey();
+ List<String> aliases = cred.getValue();
+ if (aliases == null || aliases.isEmpty()) {
+ continue;
+ }
+ Configuration c = new Configuration(conf);
+ c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+ CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0);
+ Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases());
+ for (String alias : aliases) {
+ if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
+ log.info("Credentials for " + alias + " found in " + provider);
+ } else {
+ if (br == null) {
+ br = new BufferedReader(new InputStreamReader(System.in));
+ }
+ char[] pass = readPassword(alias, br);
+ credentialProvider.createCredentialEntry(alias, pass);
+ credentialProvider.flush();
+ Arrays.fill(pass, ' ');
+ }
+ }
+ }
+ } finally {
+ org.apache.hadoop.io.IOUtils.closeStream(br);
+ }
+ }
+
+ private static char[] readOnePassword(String alias) throws IOException {
+ try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+ return readPassword(alias, br);
+ }
+ }
+
+ // using a normal reader instead of a secure one,
+ // because stdin is not hooked up to the command line
+ private static char[] readPassword(String alias, BufferedReader br)
+ throws IOException {
+ char[] cred = null;
+
+ boolean noMatch;
+ do {
+ log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
+ char[] newPassword1 = br.readLine().toCharArray();
+ log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
+ char[] newPassword2 = br.readLine().toCharArray();
+ noMatch = !Arrays.equals(newPassword1, newPassword2);
+ if (noMatch) {
+ if (newPassword1 != null) Arrays.fill(newPassword1, ' ');
+ log.info(String.format("Passwords don't match. Try again."));
+ } else {
+ cred = newPassword1;
+ }
+ if (newPassword2 != null) Arrays.fill(newPassword2, ' ');
+ } while (noMatch);
+ if (cred == null)
+ throw new IOException("Could not read credentials for " + alias +
+ " from stdin");
+ return cred;
+ }
+
+ @Override
+ public int actionBuild(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo) throws
+ YarnException,
+ IOException {
+
+ buildInstanceDefinition(clustername, buildInfo, false, false);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionKeytab(ActionKeytabArgs keytabInfo)
+ throws YarnException, IOException {
+ if (keytabInfo.install) {
+ return actionInstallKeytab(keytabInfo);
+ } else if (keytabInfo.delete) {
+ return actionDeleteKeytab(keytabInfo);
+ } else if (keytabInfo.list) {
+ return actionListKeytab(keytabInfo);
+ } else {
+ throw new BadCommandArgumentsException(
+ "Keytab option specified not found.\n"
+ + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+ }
+ }
+
+ private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException {
+ String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY;
+ Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder);
+ RemoteIterator<LocatedFileStatus> files =
+ sliderFileSystem.getFileSystem().listFiles(keytabPath, true);
+ log.info("Keytabs:");
+ while (files.hasNext()) {
+ log.info("\t" + files.next().getPath().toString());
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ private int actionDeleteKeytab(ActionKeytabArgs keytabInfo)
+ throws BadCommandArgumentsException, IOException {
+ if (StringUtils.isEmpty(keytabInfo.folder)) {
+ throw new BadCommandArgumentsException(
+ "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+ + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+ }
+
+ if (StringUtils.isEmpty(keytabInfo.keytab)) {
+ throw new BadCommandArgumentsException("A keytab name is required.");
+ }
+
+ Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
+
+ Path fileInFs = new Path(pkgPath, keytabInfo.keytab );
+ log.info("Deleting keytab {}", fileInFs);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+ require(sfs.exists(fileInFs), "No keytab to delete found at %s",
+ fileInFs.toUri());
+ sfs.delete(fileInFs, false);
+
+ return EXIT_SUCCESS;
+ }
+
+ private int actionInstallKeytab(ActionKeytabArgs keytabInfo)
+ throws BadCommandArgumentsException, IOException {
+ Path srcFile = null;
+ require(isSet(keytabInfo.folder),
+ "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+ + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+
+ requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab);
+ File keytabFile = new File(keytabInfo.keytab);
+ require(keytabFile.isFile(),
+ "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath());
+ srcFile = new Path(keytabFile.toURI());
+
+ Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+ sfs.mkdirs(pkgPath);
+ sfs.setPermission(pkgPath, new FsPermission(
+ FsAction.ALL, FsAction.NONE, FsAction.NONE));
+
+ Path fileInFs = new Path(pkgPath, srcFile.getName());
+ log.info("Installing keytab {} at {} and overwrite is {}.",
+ srcFile, fileInFs, keytabInfo.overwrite);
+ require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite),
+ "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
+
+ sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs);
+ sfs.setPermission(fileInFs,
+ new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
+ throws YarnException, IOException {
+ log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'.");
+ return actionKeytab(new ActionKeytabArgs(installKeytabInfo));
+ }
+
+ @Override
+ public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
+ YarnException,
+ IOException {
+ log.warn("The " + ACTION_INSTALL_PACKAGE
+ + " option has been deprecated. Please use '"
+ + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'.");
+ if (StringUtils.isEmpty(installPkgInfo.name)) {
+ throw new BadCommandArgumentsException(
+ E_INVALID_APPLICATION_TYPE_NAME + "\n"
+ + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
+ }
+ Path srcFile = extractPackagePath(installPkgInfo.packageURI);
+
+ // Do not provide new options to install-package command as it is in
+ // deprecated mode. So version is kept null here. Use package --install.
+ Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name,
+ null);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+ sfs.mkdirs(pkgPath);
+
+ Path fileInFs = new Path(pkgPath, srcFile.getName());
+ log.info("Installing package {} at {} and overwrite is {}.",
+ srcFile, fileInFs, installPkgInfo.replacePkg);
+ require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg),
+ "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE);
+ sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionResource(ActionResourceArgs resourceInfo)
+ throws YarnException, IOException {
+ if (resourceInfo.help) {
+ actionHelp(ACTION_RESOURCE);
+ return EXIT_SUCCESS;
+ } else if (resourceInfo.install) {
+ return actionInstallResource(resourceInfo);
+ } else if (resourceInfo.delete) {
+ return actionDeleteResource(resourceInfo);
+ } else if (resourceInfo.list) {
+ return actionListResource(resourceInfo);
+ } else {
+ throw new BadCommandArgumentsException(
+ "Resource option specified not found.\n"
+ + CommonArgs.usage(serviceArgs, ACTION_RESOURCE));
+ }
+ }
+
+ private int actionListResource(ActionResourceArgs resourceInfo) throws IOException {
+ String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
+ Path path = sliderFileSystem.buildResourcePath(folder);
+ RemoteIterator<LocatedFileStatus> files =
+ sliderFileSystem.getFileSystem().listFiles(path, true);
+ log.info("Resources:");
+ while (files.hasNext()) {
+ log.info("\t" + files.next().getPath().toString());
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ private int actionDeleteResource(ActionResourceArgs resourceInfo)
+ throws BadCommandArgumentsException, IOException {
+ if (StringUtils.isEmpty(resourceInfo.resource)) {
+ throw new BadCommandArgumentsException("A file name is required.");
+ }
+
+ Path fileInFs;
+ if (resourceInfo.folder == null) {
+ fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource);
+ } else {
+ fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder,
+ resourceInfo.resource);
+ }
+
+ log.info("Deleting resource {}", fileInFs);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+ require(sfs.exists(fileInFs), "No resource to delete found at %s", fileInFs.toUri());
+ sfs.delete(fileInFs, true);
+
+ return EXIT_SUCCESS;
+ }
+
+ private int actionInstallResource(ActionResourceArgs resourceInfo)
+ throws BadCommandArgumentsException, IOException {
+ Path srcFile = null;
+ String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
+
+ requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource);
+ File file = new File(resourceInfo.resource);
+ require(file.isFile() || file.isDirectory(),
+ "Unable to access supplied file at %s", file.getAbsolutePath());
+
+ File[] files;
+ if (file.isDirectory()) {
+ files = file.listFiles();
+ } else {
+ files = new File[] { file };
+ }
+
+ Path pkgPath = sliderFileSystem.buildResourcePath(folder);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+
+ if (!sfs.exists(pkgPath)) {
+ sfs.mkdirs(pkgPath);
+ sfs.setPermission(pkgPath, new FsPermission(
+ FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ } else {
+ require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " +
+ "not a directory", folder);
+ }
+
+ for (File f : files) {
+ srcFile = new Path(f.toURI());
+
+ Path fileInFs = new Path(pkgPath, srcFile.getName());
+ log.info("Installing file {} at {} and overwrite is {}.",
+ srcFile, fileInFs, resourceInfo.overwrite);
+ require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
+ "File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
+
+ sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
+ sfs.setPermission(fileInFs,
+ new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int actionClient(ActionClientArgs clientInfo) throws
+ YarnException,
+ IOException {
+ if (clientInfo.install) {
+ return doClientInstall(clientInfo);
+ } else if (clientInfo.getCertStore) {
+ return doCertificateStoreRetrieval(clientInfo);
+ } else {
+ throw new BadCommandArgumentsException(
+ "Only install, keystore, and truststore commands are supported for the client.\n"
+ + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+
+ }
+ }
+
+ private int doCertificateStoreRetrieval(ActionClientArgs clientInfo)
+ throws YarnException, IOException {
+ if (clientInfo.keystore != null && clientInfo.truststore != null) {
+ throw new BadCommandArgumentsException(
+ "Only one of either keystore or truststore can be retrieved at one time. "
+ + "Retrieval of both should be done separately\n"
+ + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+ }
+
+ requireArgumentSet(Arguments.ARG_NAME, clientInfo.name);
+
+ File storeFile = null;
+ SecurityStore.StoreType type;
+ if (clientInfo.keystore != null) {
+ storeFile = clientInfo.keystore;
+ type = SecurityStore.StoreType.keystore;
+ } else {
+ storeFile = clientInfo.truststore;
+ type = SecurityStore.StoreType.truststore;
+ }
+
+ require (!storeFile.exists(),
+ "File %s already exists. Please remove that file or select a different file name.",
+ storeFile.getAbsolutePath());
+ String hostname = null;
+ if (type == SecurityStore.StoreType.keystore) {
+ hostname = clientInfo.hostname;
+ if (hostname == null) {
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+ log.info("No hostname specified via command line. Using {}", hostname);
+ }
+ }
+
+ String password = clientInfo.password;
+ if (password == null) {
+ String provider = clientInfo.provider;
+ String alias = clientInfo.alias;
+ if (provider != null && alias != null) {
+ Configuration conf = new Configuration(getConfig());
+ conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+ char[] chars = conf.getPassword(alias);
+ if (chars == null) {
+ CredentialProvider credentialProvider =
+ CredentialProviderFactory.getProviders(conf).get(0);
+ chars = readOnePassword(alias);
+ credentialProvider.createCredentialEntry(alias, chars);
+ credentialProvider.flush();
+ }
+ password = String.valueOf(chars);
+ Arrays.fill(chars, ' ');
+ } else {
+ log.info("No password and no provider/alias pair were provided, " +
+ "prompting for password");
+ // get a password
+ password = String.valueOf(readOnePassword(type.name()));
+ }
+ }
+
+ byte[] keystore = createClusterOperations(clientInfo.name)
+ .getClientCertificateStore(hostname, "client", password, type.name());
+ // persist to file
+ FileOutputStream storeFileOutputStream = null;
+ try {
+ storeFileOutputStream = new FileOutputStream(storeFile);
+ IOUtils.write(keystore, storeFileOutputStream);
+ } catch (Exception e) {
+ log.error("Unable to persist to file {}", storeFile);
+ throw e;
+ } finally {
+ if (storeFileOutputStream != null) {
+ storeFileOutputStream.close();
+ }
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ private int doClientInstall(ActionClientArgs clientInfo)
+ throws IOException, SliderException {
+
+ require(clientInfo.installLocation != null,
+ E_INVALID_INSTALL_LOCATION +"\n"
+ + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+ require(clientInfo.installLocation.exists(),
+ E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath());
+
+ require(clientInfo.installLocation.isDirectory(),
+ E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath());
+
+ File pkgFile;
+ File tmpDir = null;
+
+ require(isSet(clientInfo.packageURI) || isSet(clientInfo.name),
+ E_INVALID_APPLICATION_PACKAGE_LOCATION);
+ if (isSet(clientInfo.packageURI)) {
+ pkgFile = new File(clientInfo.packageURI);
+ } else {
+ Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name);
+ Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG);
+ require(sliderFileSystem.isFile(appDefPath),
+ E_INVALID_APPLICATION_PACKAGE_LOCATION);
+ tmpDir = Files.createTempDir();
+ pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG);
+ sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile);
+ }
+ require(pkgFile.isFile(),
+ E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath());
+
+ JSONObject config = null;
+ if(clientInfo.clientConfig != null) {
+ try {
+ byte[] encoded = Files.toByteArray(clientInfo.clientConfig);
+ config = new JSONObject(new String(encoded, Charset.defaultCharset()));
+ } catch (JSONException jsonEx) {
+ log.error("Unable to read supplied configuration at {}: {}",
+ clientInfo.clientConfig, jsonEx);
+ log.debug("Unable to read supplied configuration at {}: {}",
+ clientInfo.clientConfig, jsonEx, jsonEx);
+ throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx);
+ }
+ }
+
+ // Only INSTALL is supported
+ AbstractClientProvider
+ provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE);
+ provider.processClientOperation(sliderFileSystem,
+ getRegistryOperations(),
+ getConfig(),
+ "INSTALL",
+ clientInfo.installLocation,
+ pkgFile,
+ config,
+ clientInfo.name);
+ return EXIT_SUCCESS;
+ }
+
+
+ @Override
+ public int actionPackage(ActionPackageArgs actionPackageInfo)
+ throws YarnException, IOException {
+ initializeOutputStream(actionPackageInfo.out);
+ int exitCode = -1;
+ if (actionPackageInfo.help) {
+ exitCode = actionHelp(ACTION_PACKAGE);
+ }
+ if (actionPackageInfo.install) {
+ exitCode = actionPackageInstall(actionPackageInfo);
+ }
+ if (actionPackageInfo.delete) {
+ exitCode = actionPackageDelete(actionPackageInfo);
+ }
+ if (actionPackageInfo.list) {
+ exitCode = actionPackageList();
+ }
+ if (actionPackageInfo.instances) {
+ exitCode = actionPackageInstances();
+ }
+ finalizeOutputStream(actionPackageInfo.out);
+ if (exitCode != -1) {
+ return exitCode;
+ }
+ throw new BadCommandArgumentsException(
+ "Select valid package operation option");
+ }
+
+ private void initializeOutputStream(String outFile)
+ throws FileNotFoundException {
+ if (outFile != null) {
+ clientOutputStream = new PrintStream(new FileOutputStream(outFile));
+ } else {
+ clientOutputStream = System.out;
+ }
+ }
+
+ private void finalizeOutputStream(String outFile) {
+ if (outFile != null && clientOutputStream != null) {
+ clientOutputStream.flush();
+ clientOutputStream.close();
+ }
+ clientOutputStream = System.out;
+ }
+
+ private int actionPackageInstances() throws YarnException, IOException {
+ Map<String, Path> persistentInstances = sliderFileSystem
+ .listPersistentInstances();
+ if (persistentInstances.isEmpty()) {
+ log.info("No slider cluster specification available");
+ return EXIT_SUCCESS;
+ }
+ String pkgPathValue = sliderFileSystem
+ .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri()
+ .getPath();
+ FileSystem fs = sliderFileSystem.getFileSystem();
+ Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances
+ .entrySet().iterator();
+ log.info("List of applications with its package name and path");
+ println("%-25s %15s %30s %s", "Cluster Name", "Package Name",
+ "Package Version", "Application Location");
+ while(instanceItr.hasNext()) {
+ Map.Entry<String, Path> entry = instanceItr.next();
+ String clusterName = entry.getKey();
+ Path clusterPath = entry.getValue();
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clusterName, clusterPath);
+ Path appDefPath = null;
+ try {
+ appDefPath = new Path(
+ getApplicationDefinitionPath(instanceDefinition
+ .getAppConfOperations()));
+ } catch (BadConfigException e) {
+ // Invalid cluster state, so move on to next. No need to log anything
+ // as this is just listing of instances.
+ continue;
+ }
+ if (!appDefPath.isUriPathAbsolute()) {
+ appDefPath = new Path(fs.getHomeDirectory(), appDefPath);
+ }
+ String appDefPathStr = appDefPath.toUri().toString();
+ try {
+ if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) {
+ String packageName = appDefPath.getParent().getName();
+ String packageVersion = StringUtils.EMPTY;
+ if (instanceDefinition.isVersioned()) {
+ packageVersion = packageName;
+ packageName = appDefPath.getParent().getParent().getName();
+ }
+ println("%-25s %15s %30s %s", clusterName, packageName,
+ packageVersion, appDefPathStr);
+ }
+ } catch (IOException e) {
+ log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr);
+ }
+ }
+ return EXIT_SUCCESS;
+ }
+
+ private int actionPackageList() throws IOException {
+ Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY,
+ StringUtils.EMPTY);
+ log.info("Package install path : {}", pkgPath);
+ FileSystem sfs = sliderFileSystem.getFileSystem();
+ if (!sfs.isDirectory(pkgPath)) {
+ log.info("No package(s) installed");
+ return EXIT_SUCCESS;
+ }
+ FileStatus[] fileStatus = sfs.listStatus(pkgPath);
+ boolean hasPackage = false;
+ StringBuilder sb = new StringBuilder();
+ sb.append("List of installed packages:\n");
+ for (FileStatus fstat : fileStatus) {
+ if (fstat.isDirectory()) {
+ sb.append("\t").append(fstat.getPath().getName());
+ sb.append("\n");
+ hasPackage = true;
+ }
+ }
+ if (hasPackage) {
+ println(sb.toString());
+ } else {
+ log.info("No package(s) installed");
+ }
+ return EXIT_SUCCESS;
+ }
+
+ private void createSummaryMetainfoFile(Path srcFile, Path destFile,
+ boolean overwrite) throws IOException {
+ FileSystem srcFs = srcFile.getFileSystem(getConfig());
+ try (InputStream inputStreamJson = SliderUtils
+ .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json");
+ InputStream inputStreamXml = SliderUtils
+ .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) {
+ InputStream inputStream = null;
+ Path summaryFileInFs = null;
+ if (inputStreamJson != null) {
+ inputStream = inputStreamJson;
+ summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
+ + ".metainfo.json");
+ log.info("Found JSON metainfo file in package");
+ } else if (inputStreamXml != null) {
+ inputStream = inputStreamXml;
+ summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
+ + ".metainfo.xml");
+ log.info("Found XML metainfo file in package");
+ }
+ if (inputStream != null) {
+ try (FSDataOutputStream dataOutputStream = sliderFileSystem
+ .getFileSystem().create(summaryFileInFs, overwrite)) {
+ log.info("Creating summary metainfo file");
+ IOUtils.copy(inputStream, dataOutputStream);
+ }
+ }
+ }
+ }
+
+ private int actionPackageInstall(ActionPackageArgs actionPackageArgs)
+ throws YarnException, IOException {
+ requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
+
+ Path srcFile = extractPackagePath(actionPackageArgs.packageURI);
+
+ Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
+ actionPackageArgs.version);
+ FileSystem fs = sliderFileSystem.getFileSystem();
+ if (!fs.exists(pkgPath)) {
+ fs.mkdirs(pkgPath);
+ }
+
+ Path fileInFs = new Path(pkgPath, srcFile.getName());
+ require(actionPackageArgs.replacePkg || !fs.exists(fileInFs),
+ E_PACKAGE_EXISTS +" at %s. Use --replacepkg to overwrite.", fileInFs.toUri());
+
+ log.info("Installing package {} to {} (overwrite set to {})", srcFile,
+ fileInFs, actionPackageArgs.replacePkg);
+ fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs);
+ createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg);
+
+ String destPathWithHomeDir = Path
+ .getPathWithoutSchemeAndAuthority(fileInFs).toString();
+ String destHomeDir = Path.getPathWithoutSchemeAndAuthority(
+ fs.getHomeDirectory()).toString();
+ // a somewhat contrived approach to stripping out the home directory and any trailing
+ // separator; designed to work on windows and unix
+ String destPathWithoutHomeDir;
+ if (destPathWithHomeDir.startsWith(destHomeDir)) {
+ destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length());
+ if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) {
+ destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1);
+ }
+ } else {
+ destPathWithoutHomeDir = destPathWithHomeDir;
+ }
+ log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}",
+ destPathWithoutHomeDir);
+
+ return EXIT_SUCCESS;
+ }
+
+ private Path extractPackagePath(String packageURI)
+ throws BadCommandArgumentsException {
+ require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION);
+ File pkgFile = new File(packageURI);
+ require(pkgFile.isFile(),
+ E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ": " + pkgFile.getAbsolutePath());
+ return new Path(pkgFile.toURI());
+ }
+
+ private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws
+ YarnException, IOException {
+ requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
+
+ Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
+ actionPackageArgs.version);
+ FileSystem fs = sliderFileSystem.getFileSystem();
+ require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri());
+ log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath);
+
+ if(fs.delete(pkgPath, true)) {
+ log.info("Deleted package {} " + actionPackageArgs.name);
+ return EXIT_SUCCESS;
+ } else {
+ log.warn("Package deletion failed.");
+ return EXIT_NOT_FOUND;
+ }
+ }
+
+ @Override
+ public int actionUpdate(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo) throws
+ YarnException, IOException {
+ buildInstanceDefinition(clustername, buildInfo, true, true);
+ return EXIT_SUCCESS;
+ }
+
+ /**
+ * Build up the AggregateConfiguration for an application instance then
+ * persists it
+ * @param clustername name of the cluster
+ * @param buildInfo the arguments needed to build the cluster
+ * @param overwrite true if existing cluster directory can be overwritten
+ * @param liveClusterAllowed true if live cluster can be modified
+ * @throws YarnException
+ * @throws IOException
+ */
+
+ public void buildInstanceDefinition(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
+ boolean liveClusterAllowed) throws YarnException, IOException {
+ buildInstanceDefinition(clustername, buildInfo, overwrite,
+ liveClusterAllowed, false);
+ }
+
+ public void buildInstanceDefinition(String clustername,
+ AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
+ boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
+ IOException {
+ // verify that a live cluster isn't there
+ validateClusterName(clustername);
+ verifyBindingsDefined();
+ if (!liveClusterAllowed) {
+ verifyNoLiveClusters(clustername, "Create");
+ }
+
+ Configuration conf = getConfig();
+ String registryQuorum = lookupZKQuorum();
+
+ Path appconfdir = buildInfo.getConfdir();
+ // Provider
+ String providerName = buildInfo.getProvider();
+ requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
+ log.debug("Provider is {}", providerName);
+ SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
+ AbstractClientProvider provider =
+ createClientProvider(providerName);
+ InstanceBuilder builder =
+ new InstanceBuilder(sliderFileSystem,
+ getConfig(),
+ clustername);
+
+ AggregateConf instanceDefinition = new AggregateConf();
+ ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+ ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+ ConfTreeOperations internal = instanceDefinition.getInternalOperations();
+ //initial definition is set by the providers
+ sliderAM.prepareInstanceConfiguration(instanceDefinition);
+ provider.prepareInstanceConfiguration(instanceDefinition);
+
+ //load in any specified on the command line
+ if (buildInfo.resources != null) {
+ try {
+ resources.mergeFile(buildInfo.resources,
+ new ResourcesInputPropertiesValidator());
+
+ } catch (IOException e) {
+ throw new BadConfigException(e,
+ "incorrect argument to %s: \"%s\" : %s ",
+ Arguments.ARG_RESOURCES,
+ buildInfo.resources,
+ e.toString());
+ }
+ }
+ if (buildInfo.template != null) {
+ try {
+ appConf.mergeFile(buildInfo.template,
+ new TemplateInputPropertiesValidator());
+ } catch (IOException e) {
+ throw new BadConfigException(e,
+ "incorrect argument to %s: \"%s\" : %s ",
+ Arguments.ARG_TEMPLATE,
+ buildInfo.template,
+ e.toString());
+ }
+ }
+
+ if (isUpgradeFlow) {
+ ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
+ if (!upgradeInfo.force) {
+ validateClientAndClusterResource(clustername, resources);
+ }
+ }
+
+ //get the command line options
+ ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
+ ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
+
+ appConf.merge(cmdLineAppOptions);
+
+ AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
+ appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
+
+ // put the role counts into the resources file
+ Map<String, String> argsRoleMap = buildInfo.getComponentMap();
+ for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
+ String count = roleEntry.getValue();
+ String key = roleEntry.getKey();
+ log.info("{} => {}", key, count);
+ resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
+ }
+
+ //all CLI role options
+ Map<String, Map<String, String>> appOptionMap =
+ buildInfo.getCompOptionMap();
+ appConf.mergeComponents(appOptionMap);
+
+ //internal picks up core. values only
+ internal.propagateGlobalKeys(appConf, "slider.");
+ internal.propagateGlobalKeys(appConf, "internal.");
+
+ //copy over role. and yarn. values ONLY to the resources
+ if (PROPAGATE_RESOURCE_OPTION) {
+ resources.propagateGlobalKeys(appConf, "component.");
+ resources.propagateGlobalKeys(appConf, "role.");
+ resources.propagateGlobalKeys(appConf, "yarn.");
+ resources.mergeComponentsPrefix(appOptionMap, "component.", true);
+ resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
+ resources.mergeComponentsPrefix(appOptionMap, "role.", true);
+ }
+
+ // resource component args
+ appConf.merge(cmdLineResourceOptions);
+ resources.merge(cmdLineResourceOptions);
+ resources.mergeComponents(buildInfo.getResourceCompOptionMap());
+
+ builder.init(providerName, instanceDefinition);
+ builder.propagateFilename();
+ builder.propagatePrincipals();
+ builder.setImageDetailsIfAvailable(buildInfo.getImage(),
+ buildInfo.getAppHomeDir());
+ builder.setQueue(buildInfo.queue);
+
+ String quorum = buildInfo.getZKhosts();
+ if (isUnset(quorum)) {
+ quorum = registryQuorum;
+ }
+ if (isUnset(quorum)) {
+ throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
+ }
+ ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
+ getUsername(),
+ clustername,
+ registryQuorum,
+ quorum);
+ String zookeeperRoot = buildInfo.getAppZKPath();
+
+ if (isSet(zookeeperRoot)) {
+ zkPaths.setAppPath(zookeeperRoot);
+ } else {
+ String createDefaultZkNode = appConf.getGlobalOptions()
+ .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+ if (createDefaultZkNode.equals("true")) {
+ String defaultZKPath = createZookeeperNode(clustername, false);
+ log.debug("ZK node created for application instance: {}", defaultZKPath);
+ if (defaultZKPath != null) {
+ zkPaths.setAppPath(defaultZKPath);
+ }
+ } else {
+ // create AppPath if default is being used
+ String defaultZKPath = createZookeeperNode(clustername, true);
+ log.debug("ZK node assigned to application instance: {}", defaultZKPath);
+ zkPaths.setAppPath(defaultZKPath);
+ }
+ }
+
+ builder.addZKBinding(zkPaths);
+
+ //then propagate any package URI
+ if (buildInfo.packageURI != null) {
+ appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
+ }
+
+ propagatePythonExecutable(conf, instanceDefinition);
+
+ // make any substitutions needed at this stage
+ replaceTokens(appConf.getConfTree(), getUsername(), clustername);
+
+ // TODO: Refactor the validation code and persistence code
+ try {
+ persistInstanceDefinition(overwrite, appconfdir, builder);
+ appDefinitionPersister.persistPackages();
+
+ } catch (LockAcquireFailedException e) {
+ log.warn("Failed to get a Lock on {} : {}", builder, e, e);
+ throw new BadClusterStateException("Failed to save " + clustername
+ + ": " + e);
+ }
+
+ // providers to validate what there is
+ // TODO: Validation should be done before persistence
+ AggregateConf instanceDescription = builder.getInstanceDescription();
+ validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
+ validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
+ }
+
+ private void validateClientAndClusterResource(String clustername,
+ ConfTreeOperations clientResources) throws BadClusterStateException,
+ SliderException, IOException {
+ log.info("Validating upgrade resource definition with current cluster "
+ + "state (components and instance count)");
+ Map<String, Integer> clientComponentInstances = new HashMap<>();
+ for (String componentName : clientResources.getComponentNames()) {
+ if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
+ clientComponentInstances.put(componentName, clientResources
+ .getComponentOptInt(componentName,
+ COMPONENT_INSTANCES, -1));
+ }
+ }
+
+ AggregateConf clusterConf = null;
+ try {
+ clusterConf = loadPersistedClusterDescription(clustername);
+ } catch (LockAcquireFailedException e) {
+ log.warn("Failed to get a Lock on cluster resource : {}", e, e);
+ throw new BadClusterStateException(
+ "Failed to load client resource definition " + clustername + ": " + e, e);
+ }
+ Map<String, Integer> clusterComponentInstances = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> component : clusterConf
+ .getResources().components.entrySet()) {
+ if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
+ clusterComponentInstances.put(
+ component.getKey(),
+ Integer.decode(component.getValue().get(
+ COMPONENT_INSTANCES)));
+ }
+ }
+
+ // client and cluster should be an exact match
+ Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
+ .entrySet().iterator();
+ while (clientComponentInstanceIt.hasNext()) {
+ Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next();
+ if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
+ // compare instance count now and remove from both maps if they match
+ if (clusterComponentInstances
+ .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry
+ .getValue().intValue()) {
+ clusterComponentInstances.remove(clientComponentInstanceEntry
+ .getKey());
+ clientComponentInstanceIt.remove();
+ }
+ }
+ }
+
+ if (!clientComponentInstances.isEmpty()
+ || !clusterComponentInstances.isEmpty()) {
+ log.error("Mismatch found in upgrade resource definition and cluster "
+ + "resource state");
+ if (!clientComponentInstances.isEmpty()) {
+ log.info("The upgrade resource definitions that do not match are:");
+ for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
+ .entrySet()) {
+ log.info(" Component Name: {}, Instance count: {}",
+ clientComponentInstanceEntry.getKey(),
+ clientComponentInstanceEntry.getValue());
+ }
+ }
+ if (!clusterComponentInstances.isEmpty()) {
+ log.info("The cluster resources that do not match are:");
+ for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
+ .entrySet()) {
+ log.info(" Component Name: {}, Instance count: {}",
+ clusterComponentInstanceEntry.getKey(),
+ clusterComponentInstanceEntry.getValue());
+ }
+ }
+ throw new BadConfigException("Resource definition provided for "
+ + "upgrade does not match with that of the currently running "
+ + "cluster.\nIf you are aware of what you are doing, rerun the "
+ + "command with " + Arguments.ARG_FORCE + " option.");
+ }
+ }
+
+ protected void persistInstanceDefinition(boolean overwrite,
+ Path appconfdir,
+ InstanceBuilder builder)
+ throws IOException, SliderException, LockAcquireFailedException {
+ builder.persist(appconfdir, overwrite);
+ }
+
+ @VisibleForTesting
+ public static void replaceTokens(ConfTree conf,
+ String userName, String clusterName) throws IOException {
+ Map<String,String> newglobal = new HashMap<>();
+ for (Entry<String,String> entry : conf.global.entrySet()) {
+ newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
+ userName, clusterName));
+ }
+ conf.global.putAll(newglobal);
+
+ for (String component : conf.components.keySet()) {
+ Map<String,String> newComponent = new HashMap<>();
+ for (Entry<String,String> entry : conf.components.get(component).entrySet()) {
+ newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
+ userName, clusterName));
+ }
+ conf.components.get(component).putAll(newComponent);
+ }
+
+ Map<String,List<String>> newcred = new HashMap<>();
+ for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
+ List<String> resultList = new ArrayList<>();
+ for (String v : entry.getValue()) {
+ resultList.add(replaceTokens(v, userName, clusterName));
+ }
+ newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
+ resultList);
+ }
+ conf.credentials.clear();
+ conf.credentials.putAll(newcred);
+ }
+
+ private static String replaceTokens(String s, String userName,
+ String clusterName) throws IOException {
+ return s.replaceAll(Pattern.quote("${USER}"), userName)
+ .replaceAll(Pattern.quote("${USER_NAME}"), userName)
+ .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
+ }
+
+ public FsPermission getClusterDirectoryPermissions(Configuration conf) {
+ String clusterDirPermsOct =
+ conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+ return new FsPermission(clusterDirPermsOct);
+ }
+
+ /**
+ * Verify that the Resource Manager is configured (on a non-HA cluster).
+ * with a useful error message
+ * @throws BadCommandArgumentsException the exception raised on an invalid config
+ */
+ public void verifyBindingsDefined() throws BadCommandArgumentsException {
+ InetSocketAddress rmAddr = getRmAddress(getConfig());
+ if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
+ && !isAddressDefined(rmAddr)) {
+ throw new BadCommandArgumentsException(
+ E_NO_RESOURCE_MANAGER
+ + " in the argument "
+ + Arguments.ARG_MANAGER
+ + " or the configuration property "
+ + YarnConfiguration.RM_ADDRESS
+ + " value :" + rmAddr);
+ }
+ }
+
+ /**
+ * Load and start a cluster specification.
+ * This assumes that all validation of args and cluster state
+ * have already taken place
+ *
+ * @param clustername name of the cluster.
+ * @param launchArgs launch arguments
+ * @return the exit code
+ * @throws YarnException
+ * @throws IOException
+ */
+ protected int startCluster(String clustername,
+ LaunchArgsAccessor launchArgs) throws
+ YarnException,
+ IOException {
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ clustername,
+ clusterDirectory);
+
+ LaunchedApplication launchedApplication =
+ launchApplication(clustername, clusterDirectory, instanceDefinition,
+ serviceArgs.isDebug());
+
+ if (launchArgs.getOutputFile() != null) {
+ // output file has been requested. Get the app report and serialize it
+ ApplicationReport report =
+ launchedApplication.getApplicationReport();
+ SerializedApplicationReport sar = new SerializedApplicationReport(report);
+ sar.submitTime = System.currentTimeMillis();
+ ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
+ serDeser.save(sar, launchArgs.getOutputFile());
+ }
+ int waittime = launchArgs.getWaittime();
+ if (waittime > 0) {
+ return waitForAppRunning(launchedApplication, waittime, waittime);
+ } else {
+ // no waiting
+ return EXIT_SUCCESS;
+ }
+ }
+
+ /**
+ * Load the instance definition. It is not resolved at this point
+ * @param name cluster name
+ * @param clusterDirectory cluster dir
+ * @return the loaded configuration
+ * @throws IOException
+ * @throws SliderException
+ * @throws UnknownApplicationInstanceException if the file is not found
+ */
+ public AggregateConf loadInstanceDefinitionUnresolved(String name,
+ Path clusterDirectory) throws IOException, SliderException {
+
+ try {
+ AggregateConf definition =
+ InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
+ clusterDirectory);
+ definition.setName(name);
+ return definition;
+ } catch (FileNotFoundException e) {
+ throw UnknownApplicationInstanceException.unknownInstance(name, e);
+ }
+ }
+
+ /**
+ * Load the instance definition.
+ * @param name cluster name
+ * @param resolved flag to indicate the cluster should be resolved
+ * @return the loaded configuration
+ * @throws IOException IO problems
+ * @throws SliderException slider explicit issues
+ * @throws UnknownApplicationInstanceException if the file is not found
+ */
+ public AggregateConf loadInstanceDefinition(String name,
+ boolean resolved) throws
+ IOException,
+ SliderException {
+
+ Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+ AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+ name,
+ clusterDirectory);
+ if (resolved) {
+ instanceDefinition.resolve();
+ }
+ return instanceDefinition;
+
+ }
+
+ protected AppMasterLauncher setupAppMasterLauncher(String clustername,
+ Path clusterDirectory,
+ AggregateConf instanceDefinition,
+ boolean debugAM)
+ throws YarnException, IOException{
+ deployedClusterName = clustername;
+ validateClusterName(clustername);
+ verifyNoLiveClusters(clustername, "Launch");
+ Configuration config = getConfig();
+ lookupZKQuorum();
+ boolean clusterSecure = isHadoopClusterSecure(config);
+ //create the Slider AM provider -this helps set up the AM
+ SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
+
+ instanceDefinition.resolve();
+ launchedInstanceDefinition = instanceDefinition;
+
+ ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations();
+ MapOperations internalOptions = internalOperations.getGlobalOptions();
+ ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations();
+ ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations();
+ Path generatedConfDirPath =
+ createPathThatMustExist(internalOptions.getMandatoryOption(
+ INTERNAL_GENERATED_CONF_PATH));
+ Path snapshotConfPath =
+ createPathThatMustExist(internalOptions.getMandatoryOption(
+ INTERNAL_SNAPSHOT_CONF_PATH));
+
+
+ // cluster Provider
+ AbstractClientProvider provider = createClientProvider(
+ internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
+ if (log.isDebugEnabled()) {
+ log.debug(instanceDefinition.toString());
+ }
+ MapOperations sliderAMResourceComponent =
+ resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
+ MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
+
+ // add the tags if available
+ Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
+ getApplicationDefinitionPath(appOperations));
+
+ Credentials credentials = null;
+ if (clusterSecure) {
+ // pick up oozie credentials
+ credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
+ config);
+ if (credentials == null) {
+ // nothing from oozie, so build up directly
+ credentials = new Credentials(
+ UserGroupInformation.getCurrentUser().getCredentials());
+ CredentialUtils.addRMRenewableFSDelegationTokens(config,
+ sliderFileSystem.getFileSystem(),
+ credentials);
+ CredentialUtils.addRMDelegationToken(yarnClient, credentials);
+
+ } else {
+ log.info("Using externally supplied credentials to launch AM");
+ }
+ }
+
+ AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
+ SliderKeys.APP_TYPE,
+ config,
+ sliderFileSystem,
+ yarnClient,
+ clusterSecure,
+ sliderAMResourceComponent,
+ resourceGlobalOptions,
+ applicationTags,
+ credentials);
+
+ ApplicationId appId = amLauncher.getApplicationId();
+ // set the application name;
+ amLauncher.setKeepContainersOverRestarts(true);
+
+ int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
+ amLauncher.setMaxAppAttempts(maxAppAttempts);
+
+ sliderFileSystem.purgeAppInstanceTempFiles(clustername);
+ Path tempPath = sliderFileSystem.createAppInstanceTempPath(
+ clustername,
+ appId.toString() + "/am");
+ String libdir = "lib";
+ Path libPath = new Path(tempPath, libdir);
+ sliderFileSystem.getFileSystem().mkdirs(libPath);
+ log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath);
+
+ // set local resources for the application master
+ // local files or archives as needed
+ // In this scenario, the jar file for the application master is part of the local resources
+ Map<String, LocalResource> localResources = amLauncher.getLocalResources();
+
+ // look for the configuration directory named on the command line
+ boolean hasServerLog4jProperties = false;
+ Path remoteConfPath = null;
+ String relativeConfDir = null;
+ String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
+ if (isUnset(confdirProp)) {
+ log.debug("No local configuration directory provided as system property");
+ } else {
+ File confDir = new File(confdirProp);
+ if (!confDir.exists()) {
+ throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
+ confDir);
+ }
+ Path localConfDirPath = createLocalPath(confDir);
+ remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
+ log.debug("Slider configuration directory is {}; remote to be {}",
+ localConfDirPath, remoteConfPath);
+ copyDirectory(config, localConfDirPath, remoteConfPath, null);
+
+ File log4jserver =
+ new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+ hasServerLog4jProperties = log4jserver.isFile();
+ }
+ // the assumption here is that minimr cluster => this is a test run
+ // and the classpath can look after itself
+
+ boolean usingMiniMRCluster = getUsingMiniMRCluster();
+ if (!usingMiniMRCluster) {
+
+ log.debug("Destination is not a MiniYARNCluster -copying full classpath");
+
+ // insert conf dir first
+ if (remoteConfPath != null) {
+ relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
+ Map<String, LocalResource> submittedConfDir =
+ sliderFileSystem.submitDirectory(remoteConfPath,
+ relativeConfDir);
+ mergeMaps(localResources, submittedConfDir);
+ }
+ }
+ // build up the configuration
+ // IMPORTANT: it is only after this call that site configurations
+ // will be valid.
+
+ propagatePrincipals(config, instanceDefinition);
+ // validate security data
+
+/*
+ // turned off until tested
+ SecurityConfiguration securityConfiguration =
+ new SecurityConfiguration(config,
+ instanceDefinition, clustername);
+
+*/
+ Configuration clientConfExtras = new Configuration(false);
+ // then build up the generated path.
+ FsPermission clusterPerms = getClusterDirectoryPermissions(config);
+ copyDirectory(config, snapshotConfPath, generatedConfDirPath,
+ clusterPerms);
+
+
+ // standard AM resources
+ sliderAM.prepareAMAnd
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org