You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:32 UTC

[32/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
new file mode 100644
index 0000000..8210f4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -0,0 +1,4569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.slider.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.KerberosDiags;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.SliderApplicationApi;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.StateValues;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.types.ContainerInformation;
+import org.apache.slider.api.types.NodeInformationList;
+import org.apache.slider.api.types.SliderInstanceDescription;
+import org.apache.slider.client.ipc.SliderApplicationIpcClient;
+import org.apache.slider.client.ipc.SliderClusterOperations;
+import org.apache.slider.common.Constants;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.params.ActionAMSuicideArgs;
+import org.apache.slider.common.params.ActionClientArgs;
+import org.apache.slider.common.params.ActionCreateArgs;
+import org.apache.slider.common.params.ActionDependencyArgs;
+import org.apache.slider.common.params.ActionDestroyArgs;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
+import org.apache.slider.common.params.ActionEchoArgs;
+import org.apache.slider.common.params.ActionExistsArgs;
+import org.apache.slider.common.params.ActionFlexArgs;
+import org.apache.slider.common.params.ActionFreezeArgs;
+import org.apache.slider.common.params.ActionInstallKeytabArgs;
+import org.apache.slider.common.params.ActionInstallPackageArgs;
+import org.apache.slider.common.params.ActionKDiagArgs;
+import org.apache.slider.common.params.ActionKeytabArgs;
+import org.apache.slider.common.params.ActionKillContainerArgs;
+import org.apache.slider.common.params.ActionListArgs;
+import org.apache.slider.common.params.ActionLookupArgs;
+import org.apache.slider.common.params.ActionNodesArgs;
+import org.apache.slider.common.params.ActionPackageArgs;
+import org.apache.slider.common.params.ActionRegistryArgs;
+import org.apache.slider.common.params.ActionResolveArgs;
+import org.apache.slider.common.params.ActionResourceArgs;
+import org.apache.slider.common.params.ActionStatusArgs;
+import org.apache.slider.common.params.ActionThawArgs;
+import org.apache.slider.common.params.ActionTokensArgs;
+import org.apache.slider.common.params.ActionUpgradeArgs;
+import org.apache.slider.common.params.Arguments;
+import org.apache.slider.common.params.ClientArgs;
+import org.apache.slider.common.params.CommonArgs;
+import org.apache.slider.common.params.LaunchArgsAccessor;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceBuilder;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
+import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.NotFoundException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.exceptions.UsageException;
+import org.apache.slider.core.exceptions.WaitTimeoutException;
+import org.apache.slider.core.launch.AppMasterLauncher;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CredentialUtils;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.launch.LaunchedApplication;
+import org.apache.slider.core.launch.RunningApplication;
+import org.apache.slider.core.launch.SerializedApplicationReport;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.persist.AppDefinitionPersister;
+import org.apache.slider.core.persist.ApplicationReportSerDeser;
+import org.apache.slider.core.persist.ConfPersister;
+import org.apache.slider.core.persist.JsonSerDeser;
+import org.apache.slider.core.persist.LockAcquireFailedException;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.registry.YarnAppListClient;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExports;
+import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
+import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.services.security.SecurityStore;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
+import static org.apache.slider.api.InternalKeys.*;
+import static org.apache.slider.api.OptionKeys.*;
+import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
+import static org.apache.slider.common.params.SliderActions.*;
+import static org.apache.slider.common.tools.SliderUtils.*;
+
+
+/**
+ * Client service for Slider
+ */
+
+public class SliderClient extends AbstractSliderLaunchedService implements RunService,
+    SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
+  private static final Logger log = LoggerFactory.getLogger(SliderClient.class);
+  public static final String E_MUST_BE_A_VALID_JSON_FILE
+      = "Invalid configuration. Must be a valid json file.";
+  public static final String E_INVALID_INSTALL_LOCATION
+      = "A valid install location must be provided for the client.";
+  public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
+      = "Unable to read supplied package file";
+  public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION
+      = "A valid application package location required.";
+  public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory";
+  public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist";
+  public static final String E_INVALID_APPLICATION_TYPE_NAME
+      = "A valid application type name is required (e.g. HBASE).";
+  public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite.";
+  public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist";
+  public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined";
+  public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided";
+  public static final String E_PACKAGE_EXISTS = "Package exists";
+  private static PrintStream clientOutputStream = System.out;
+
+  // value should not be changed without updating string find in slider.py
+  private static final String PASSWORD_PROMPT = "Enter password for";
+
+  private ClientArgs serviceArgs;
+  public ApplicationId applicationId;
+  
+  private String deployedClusterName;
+  /**
+   * Cluster operations against the deployed cluster -will be null
+   * if no bonding has yet taken place
+   */
+  private SliderClusterOperations sliderClusterOperations;
+
+  protected SliderFileSystem sliderFileSystem;
+
+  /**
+   * Yarn client service
+   */
+  private SliderYarnClientImpl yarnClient;
+  private YarnAppListClient yarnAppListClient;
+  private AggregateConf launchedInstanceDefinition;
+
+  /**
+   * The YARN registry service
+   */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+  private RegistryOperations registryOperations;
+
+  /**
+   * Constructor
+   */
+  public SliderClient() {
+    super("Slider Client");
+    new HdfsConfiguration();
+    new YarnConfiguration();
+  }
+
+  /**
+   * This is called <i>Before serviceInit is called</i>
+   * @param config the initial configuration build up by the
+   * service launcher.
+   * @param args argument list list of arguments passed to the command line
+   * after any launcher-specific commands have been stripped.
+   * @return the post-binding configuration to pass to the <code>init()</code>
+   * operation.
+   * @throws Exception
+   */
+  @Override
+  public Configuration bindArgs(Configuration config, String... args) throws Exception {
+    config = super.bindArgs(config, args);
+    serviceArgs = new ClientArgs(args);
+    serviceArgs.parse();
+    // add the slider XML config
+    ConfigHelper.injectSliderXMLResource();
+    // yarn-ify
+    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
+    return patchConfiguration(yarnConfiguration);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    Configuration clientConf = loadSliderClientXML();
+    ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true);
+    serviceArgs.applyDefinitions(conf);
+    serviceArgs.applyFileSystemBinding(conf);
+    AbstractActionArgs coreAction = serviceArgs.getCoreAction();
+    // init security with our conf
+    if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) {
+      forceLogin();
+      initProcessSecurity(conf);
+    }
+    if (coreAction.getHadoopServicesRequired()) {
+      initHadoopBinding();
+    }
+    super.serviceInit(conf);
+  }
+
+  /**
+   * Launched service execution. This runs {@link #exec()}
+   * then catches some exceptions and converts them to exit codes
+   * @return an exit code
+   * @throws Throwable
+   */
+  @Override
+  public int runService() throws Throwable {
+    try {
+      return exec();
+    } catch (FileNotFoundException | PathNotFoundException nfe) {
+      throw new NotFoundException(nfe, nfe.toString());
+    }
+  }
+
+  /**
+   * Execute the command line
+   * @return an exit code
+   * @throws Throwable on a failure
+   */
+  public int exec() throws Throwable {
+
+    // choose the action
+    String action = serviceArgs.getAction();
+    if (isUnset(action)) {
+      throw new SliderException(EXIT_USAGE, serviceArgs.usage());
+    }
+
+    int exitCode = EXIT_SUCCESS;
+    String clusterName = serviceArgs.getClusterName();
+    // actions
+
+    switch (action) {
+      case ACTION_AM_SUICIDE:
+        exitCode = actionAmSuicide(clusterName,
+            serviceArgs.getActionAMSuicideArgs());
+        break;
+      
+      case ACTION_BUILD:
+        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+        break;
+      
+      case ACTION_CLIENT:
+        exitCode = actionClient(serviceArgs.getActionClientArgs());
+        break;
+
+      case ACTION_CREATE:
+        exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+        break;
+
+      case ACTION_DEPENDENCY:
+        exitCode = actionDependency(serviceArgs.getActionDependencyArgs());
+        break;
+
+      case ACTION_DESTROY:
+        exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs());
+        break;
+
+      case ACTION_DIAGNOSTICS:
+        exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
+        break;
+      
+      case ACTION_EXISTS:
+        exitCode = actionExists(clusterName,
+            serviceArgs.getActionExistsArgs());
+        break;
+      
+      case ACTION_FLEX:
+        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+        break;
+      
+      case ACTION_FREEZE:
+        exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs());
+        break;
+      
+      case ACTION_HELP:
+        log.info(serviceArgs.usage());
+        break;
+
+      case ACTION_KDIAG:
+        exitCode = actionKDiag(serviceArgs.getActionKDiagArgs());
+        break;
+
+      case ACTION_KILL_CONTAINER:
+        exitCode = actionKillContainer(clusterName,
+            serviceArgs.getActionKillContainerArgs());
+        break;
+
+      case ACTION_INSTALL_KEYTAB:
+        exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
+        break;
+      
+      case ACTION_INSTALL_PACKAGE:
+        exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
+        break;
+
+      case ACTION_KEYTAB:
+        exitCode = actionKeytab(serviceArgs.getActionKeytabArgs());
+        break;
+
+      case ACTION_LIST:
+        exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
+        break;
+
+      case ACTION_LOOKUP:
+        exitCode = actionLookup(serviceArgs.getActionLookupArgs());
+        break;
+
+      case ACTION_NODES:
+        exitCode = actionNodes("", serviceArgs.getActionNodesArgs());
+        break;
+
+      case ACTION_PACKAGE:
+        exitCode = actionPackage(serviceArgs.getActionPackageArgs());
+        break;
+
+      case ACTION_REGISTRY:
+        exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
+        break;
+      
+      case ACTION_RESOLVE:
+        exitCode = actionResolve(serviceArgs.getActionResolveArgs());
+        break;
+
+      case ACTION_RESOURCE:
+        exitCode = actionResource(serviceArgs.getActionResourceArgs());
+        break;
+
+      case ACTION_STATUS:
+        exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs());
+        break;
+
+      case ACTION_THAW:
+        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+        break;
+
+      case ACTION_TOKENS:
+        exitCode = actionTokens(serviceArgs.getActionTokenArgs());
+        break;
+
+      case ACTION_UPDATE:
+        exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+        break;
+
+      case ACTION_UPGRADE:
+        exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs());
+        break;
+
+      case ACTION_VERSION:
+        exitCode = actionVersion();
+        break;
+      
+      default:
+        throw new SliderException(EXIT_UNIMPLEMENTED,
+            "Unimplemented: " + action);
+    }
+   
+    return exitCode;
+  }
+
+  /**
+   * Perform everything needed to init the hadoop binding.
+   * This assumes that the service is already  in inited or started state
+   * @throws IOException
+   * @throws SliderException
+   */
+  protected void initHadoopBinding() throws IOException, SliderException {
+    // validate the client
+    validateSliderClientEnvironment(null);
+    //create the YARN client
+    yarnClient = new SliderYarnClientImpl();
+    yarnClient.init(getConfig());
+    if (getServiceState() == STATE.STARTED) {
+      yarnClient.start();
+    }
+    addService(yarnClient);
+    yarnAppListClient =
+        new YarnAppListClient(yarnClient, getUsername(), getConfig());
+    // create the filesystem
+    sliderFileSystem = new SliderFileSystem(getConfig());
+  }
+
+  /**
+   * Delete the zookeeper node associated with the calling user and the cluster
+   * TODO: YARN registry operations
+   **/
+  @VisibleForTesting
+  public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    Exception e = null;
+    try {
+      Configuration config = getConfig();
+      ZKIntegration client = getZkClient(clusterName, user);
+      if (client != null) {
+        if (client.exists(zkPath)) {
+          log.info("Deleting zookeeper path {}", zkPath);
+        }
+        client.deleteRecursive(zkPath);
+        return true;
+      }
+    } catch (InterruptedException | BadConfigException | KeeperException ex) {
+      e = ex;
+    }
+    if (e != null) {
+      log.warn("Unable to recursively delete zk node {}", zkPath, e);
+    }
+
+    return false;
+  }
+
+  /**
+   * Create the zookeeper node associated with the calling user and the cluster
+   *
+   * @param clusterName slider application name
+   * @param nameOnly should the name only be created (i.e. don't create ZK node)
+   * @return the path, using the policy implemented in
+   *   {@link ZKIntegration#mkClusterPath(String, String)}
+   * @throws YarnException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+    try {
+      return createZookeeperNodeInner(clusterName, nameOnly);
+    } catch (KeeperException.NodeExistsException e) {
+      return null;
+    } catch (KeeperException e) {
+      return null;
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.toString());
+    }
+  }
+
+  /**
+   * Create the zookeeper node associated with the calling user and the cluster
+   * -throwing exceptions on any failure
+   * @param clusterName cluster name
+   * @param nameOnly create the path, not the node
+   * @return the path, using the policy implemented in
+   *   {@link ZKIntegration#mkClusterPath(String, String)}
+   * @throws YarnException
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  @VisibleForTesting
+  public String createZookeeperNodeInner(String clusterName, Boolean nameOnly)
+      throws YarnException, IOException, KeeperException, InterruptedException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    if (nameOnly) {
+      return zkPath;
+    }
+    ZKIntegration client = getZkClient(clusterName, user);
+    if (client != null) {
+      // set up the permissions. This must be done differently on a secure cluster from an insecure
+      // one
+      List<ACL> zkperms = new ArrayList<>();
+      if (UserGroupInformation.isSecurityEnabled()) {
+        zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+        zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+      } else {
+        zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+      }
+      client.createPath(zkPath, "",
+          zkperms,
+          CreateMode.PERSISTENT);
+      return zkPath;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+   **/
+  protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+    String registryQuorum = lookupZKQuorum();
+    ZKIntegration client = null;
+    try {
+      BlockingZKWatcher watcher = new BlockingZKWatcher();
+      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher,
+          ZKIntegration.SESSION_TIMEOUT);
+      client.init();
+      watcher.waitForZKConnection(2 * 1000);
+    } catch (InterruptedException e) {
+      client = null;
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    } catch (IOException e) {
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    }
+    return client;
+  }
+
+  /**
+   * Keep this signature for backward compatibility with
+   * force=true by default.
+   */
+  @Override
+  public int actionDestroy(String clustername) throws YarnException,
+                                                      IOException {
+    ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
+    destroyArgs.force = true;
+    return actionDestroy(clustername, destroyArgs);
+  }
+
+  @Override
+  public int actionDestroy(String clustername,
+      ActionDestroyArgs destroyArgs) throws YarnException, IOException {
+    // verify that a live cluster isn't there
+    validateClusterName(clustername);
+    //no=op, it is now mandatory. 
+    verifyBindingsDefined();
+    verifyNoLiveClusters(clustername, "Destroy");
+    boolean forceDestroy = destroyArgs.force;
+    log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
+
+    // create the directory path
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    // delete the directory;
+    FileSystem fs = sliderFileSystem.getFileSystem();
+    boolean exists = fs.exists(clusterDirectory);
+    if (exists) {
+      log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory);
+      if (!forceDestroy) {
+        // fail the command if --force is not explicitly specified
+        throw new UsageException("Destroy will permanently delete directories and registries. "
+            + "Reissue this command with the --force option if you want to proceed.");
+      }
+      if (!fs.delete(clusterDirectory, true)) {
+        log.warn("Filesystem returned false from delete() operation");
+      }
+
+      if(!deleteZookeeperNode(clustername)) {
+        log.warn("Unable to perform node cleanup in Zookeeper.");
+      }
+
+      if (fs.exists(clusterDirectory)) {
+        log.warn("Failed to delete {}", clusterDirectory);
+      }
+
+    } else {
+      log.debug("Application Instance {} already destroyed", clustername);
+    }
+
+    // rm the registry entry \u2014do not let this block the destroy operations
+    String registryPath = SliderRegistryUtils.registryPathForInstance(
+        clustername);
+    try {
+      getRegistryOperations().delete(registryPath, true);
+    } catch (IOException e) {
+      log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
+    } catch (SliderException e) {
+      log.warn("Error binding to registry {} ", e, e);
+    }
+
+    List<ApplicationReport> instances = findAllLiveInstances(clustername);
+    // detect any race leading to cluster creation during the check/destroy process
+    // and report a problem.
+    if (!instances.isEmpty()) {
+      throw new SliderException(EXIT_APPLICATION_IN_USE,
+                              clustername + ": "
+                              + E_DESTROY_CREATE_RACE_CONDITION
+                              + " :" +
+                              instances.get(0));
+    }
+    log.info("Destroyed cluster {}", clustername);
+    return EXIT_SUCCESS;
+  }
+  
+  @Override
+  public int actionAmSuicide(String clustername,
+      ActionAMSuicideArgs args) throws YarnException, IOException {
+    SliderClusterOperations clusterOperations =
+      createClusterOperations(clustername);
+    clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public AbstractClientProvider createClientProvider(String provider)
+    throws SliderException {
+    SliderProviderFactory factory =
+      SliderProviderFactory.createSliderProviderFactory(provider);
+    return factory.createClientProvider();
+  }
+
+  /**
+   * Create the cluster -saving the arguments to a specification file first
+   * @param clustername cluster name
+   * @return the status code
+   * @throws YarnException Yarn problems
+   * @throws IOException other problems
+   * @throws BadCommandArgumentsException bad arguments.
+   */
+  public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
+                                               YarnException,
+                                               IOException {
+
+    actionBuild(clustername, createArgs);
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+        clustername, clusterDirectory);
+    try {
+      checkForCredentials(getConfig(), instanceDefinition.getAppConf());
+    } catch (IOException e) {
+      sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
+      throw e;
+    }
+    return startCluster(clustername, createArgs);
+  }
+
+  @Override
+  public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
+      throws YarnException, IOException {
+    File template = upgradeArgs.template;
+    File resources = upgradeArgs.resources;
+    List<String> containers = upgradeArgs.containers;
+    List<String> components = upgradeArgs.components;
+
+    // For upgrade spec, let's be little more strict with validation. If either
+    // --template or --resources is specified, then both needs to be specified.
+    // Otherwise the internal app config and resources states of the app will be
+    // unwantedly modified and the change will take effect to the running app
+    // immediately.
+    require(!(template != null && resources == null),
+          "Option %s must be specified with option %s",
+          Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
+
+    require(!(resources != null && template == null),
+          "Option %s must be specified with option %s",
+          Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
+
+    // For upgrade spec, both --template and --resources should be specified
+    // and neither of --containers or --components should be used
+    if (template != null && resources != null) {
+      require(CollectionUtils.isEmpty(containers),
+            "Option %s cannot be specified with %s or %s",
+            Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
+            Arguments.ARG_RESOURCES);
+      require(CollectionUtils.isEmpty(components),
+              "Option %s cannot be specified with %s or %s",
+              Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
+              Arguments.ARG_RESOURCES);
+
+      // not an error to try to upgrade a stopped cluster, just return success
+      // code, appropriate log messages have already been dumped
+      if (!isAppInRunningState(clustername)) {
+        return EXIT_SUCCESS;
+      }
+
+      // Now initiate the upgrade spec flow
+      buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
+      SliderClusterOperations clusterOperations = createClusterOperations(clustername);
+      clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000);
+      return EXIT_SUCCESS;
+    }
+
+    // Since neither --template or --resources were specified, it is upgrade
+    // containers flow. Here any one or both of --containers and --components
+    // can be specified. If a container is specified with --containers option
+    // and also belongs to a component type specified with --components, it will
+    // be upgraded only once.
+    return actionUpgradeContainers(clustername, upgradeArgs);
+  }
+
+  private int actionUpgradeContainers(String clustername,
+      ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
+    verifyBindingsDefined();
+    validateClusterName(clustername);
+    int waittime = upgradeArgs.getWaittime(); // ignored for now
+    String text = "Upgrade containers";
+    log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
+        text, waittime);
+
+    // not an error to try to upgrade a stopped cluster, just return success
+    // code, appropriate log messages have already been dumped
+    if (!isAppInRunningState(clustername)) {
+      return EXIT_SUCCESS;
+    }
+
+    // Create sets of containers and components to get rid of duplicates and
+    // for quick lookup during checks below
+    Set<String> containers = new HashSet<>();
+    if (upgradeArgs.containers != null) {
+      containers.addAll(new ArrayList<>(upgradeArgs.containers));
+    }
+    Set<String> components = new HashSet<>();
+    if (upgradeArgs.components != null) {
+      components.addAll(new ArrayList<>(upgradeArgs.components));
+    }
+
+    // check validity of component names and running containers here
+    List<ContainerInformation> liveContainers = getContainers(clustername);
+    Set<String> validContainers = new HashSet<>();
+    Set<String> validComponents = new HashSet<>();
+    for (ContainerInformation liveContainer : liveContainers) {
+      boolean allContainersAndComponentsAccountedFor = true;
+      if (CollectionUtils.isNotEmpty(containers)) {
+        if (containers.contains(liveContainer.containerId)) {
+          containers.remove(liveContainer.containerId);
+          validContainers.add(liveContainer.containerId);
+        }
+        allContainersAndComponentsAccountedFor = false;
+      }
+      if (CollectionUtils.isNotEmpty(components)) {
+        if (components.contains(liveContainer.component)) {
+          components.remove(liveContainer.component);
+          validComponents.add(liveContainer.component);
+        }
+        allContainersAndComponentsAccountedFor = false;
+      }
+      if (allContainersAndComponentsAccountedFor) {
+        break;
+      }
+    }
+
+    // If any item remains in containers or components then they are invalid.
+    // Log warning for them and proceed.
+    if (CollectionUtils.isNotEmpty(containers)) {
+      log.warn("Invalid set of containers provided {}", containers);
+    }
+    if (CollectionUtils.isNotEmpty(components)) {
+      log.warn("Invalid set of components provided {}", components);
+    }
+
+    // If not a single valid container or component is specified do not proceed
+    if (CollectionUtils.isEmpty(validContainers)
+        && CollectionUtils.isEmpty(validComponents)) {
+      log.error("Not a single valid container or component specified. Nothing to do.");
+      return EXIT_NOT_FOUND;
+    }
+
+    SliderClusterProtocol appMaster = connect(findInstance(clustername));
+    Messages.UpgradeContainersRequestProto r =
+      Messages.UpgradeContainersRequestProto
+              .newBuilder()
+              .setMessage(text)
+              .addAllContainer(validContainers)
+              .addAllComponent(validComponents)
+              .build();
+    appMaster.upgradeContainers(r);
+    log.info("Cluster upgrade issued for -");
+    if (CollectionUtils.isNotEmpty(validContainers)) {
+      log.info(" Containers (total {}): {}", validContainers.size(),
+          validContainers);
+    }
+    if (CollectionUtils.isNotEmpty(validComponents)) {
+      log.info(" Components (total {}): {}", validComponents.size(),
+          validComponents);
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  // returns true if and only if app is in RUNNING state
+  private boolean isAppInRunningState(String clustername) throws YarnException,
+      IOException {
+    // is this actually a known cluster?
+    sliderFileSystem.locateInstanceDefinition(clustername);
+    ApplicationReport app = findInstance(clustername);
+    if (app == null) {
+      // exit early
+      log.info("Cluster {} not running", clustername);
+      return false;
+    }
+    log.debug("App to upgrade was found: {}:\n{}", clustername,
+        new OnDemandReportStringifier(app));
+    if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
+      log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.",
+          clustername, app.getYarnApplicationState(), ACTION_UPDATE);
+      return false;
+    }
+
+    // IPC request to upgrade containers is possible if the app is running.
+    if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
+        .ordinal()) {
+      log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
+          + "to be RUNNING.", clustername, app.getYarnApplicationState());
+      return false;
+    }
+
+    return true;
+  }
+
+  protected static void checkForCredentials(Configuration conf,
+      ConfTree tree) throws IOException {
+    if (tree.credentials == null || tree.credentials.isEmpty()) {
+      log.info("No credentials requested");
+      return;
+    }
+
+    BufferedReader br = null;
+    try {
+      for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
+        String provider = cred.getKey();
+        List<String> aliases = cred.getValue();
+        if (aliases == null || aliases.isEmpty()) {
+          continue;
+        }
+        Configuration c = new Configuration(conf);
+        c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+        CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0);
+        Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases());
+        for (String alias : aliases) {
+          if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
+            log.info("Credentials for " + alias + " found in " + provider);
+          } else {
+            if (br == null) {
+              br = new BufferedReader(new InputStreamReader(System.in));
+            }
+            char[] pass = readPassword(alias, br);
+            credentialProvider.createCredentialEntry(alias, pass);
+            credentialProvider.flush();
+            Arrays.fill(pass, ' ');
+          }
+        }
+      }
+    } finally {
+      org.apache.hadoop.io.IOUtils.closeStream(br);
+    }
+  }
+
+  private static char[] readOnePassword(String alias) throws IOException {
+    try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+      return readPassword(alias, br);
+    }
+  }
+
+  // using a normal reader instead of a secure one,
+  // because stdin is not hooked up to the command line
+  private static char[] readPassword(String alias, BufferedReader br)
+      throws IOException {
+    char[] cred = null;
+
+    boolean noMatch;
+    do {
+      log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
+      char[] newPassword1 = br.readLine().toCharArray();
+      log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
+      char[] newPassword2 = br.readLine().toCharArray();
+      noMatch = !Arrays.equals(newPassword1, newPassword2);
+      if (noMatch) {
+        if (newPassword1 != null) Arrays.fill(newPassword1, ' ');
+        log.info(String.format("Passwords don't match. Try again."));
+      } else {
+        cred = newPassword1;
+      }
+      if (newPassword2 != null) Arrays.fill(newPassword2, ' ');
+    } while (noMatch);
+    if (cred == null)
+      throw new IOException("Could not read credentials for " + alias +
+          " from stdin");
+    return cred;
+  }
+
+  @Override
+  public int actionBuild(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+                                               YarnException,
+                                               IOException {
+
+    buildInstanceDefinition(clustername, buildInfo, false, false);
+    return EXIT_SUCCESS; 
+  }
+
+  @Override
+  public int actionKeytab(ActionKeytabArgs keytabInfo)
+      throws YarnException, IOException {
+    if (keytabInfo.install) {
+      return actionInstallKeytab(keytabInfo);
+    } else if (keytabInfo.delete) {
+      return actionDeleteKeytab(keytabInfo);
+    } else if (keytabInfo.list) {
+      return actionListKeytab(keytabInfo);
+    } else {
+      throw new BadCommandArgumentsException(
+          "Keytab option specified not found.\n"
+          + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+    }
+  }
+
+  private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException {
+    String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY;
+    Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder);
+    RemoteIterator<LocatedFileStatus> files =
+        sliderFileSystem.getFileSystem().listFiles(keytabPath, true);
+    log.info("Keytabs:");
+    while (files.hasNext()) {
+      log.info("\t" + files.next().getPath().toString());
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  private int actionDeleteKeytab(ActionKeytabArgs keytabInfo)
+      throws BadCommandArgumentsException, IOException {
+    if (StringUtils.isEmpty(keytabInfo.folder)) {
+      throw new BadCommandArgumentsException(
+          "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+          + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+    }
+
+    if (StringUtils.isEmpty(keytabInfo.keytab)) {
+      throw new BadCommandArgumentsException("A keytab name is required.");
+    }
+
+    Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
+
+    Path fileInFs = new Path(pkgPath, keytabInfo.keytab );
+    log.info("Deleting keytab {}", fileInFs);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+    require(sfs.exists(fileInFs), "No keytab to delete found at %s",
+        fileInFs.toUri());
+    sfs.delete(fileInFs, false);
+
+    return EXIT_SUCCESS;
+  }
+
+  private int actionInstallKeytab(ActionKeytabArgs keytabInfo)
+      throws BadCommandArgumentsException, IOException {
+    Path srcFile = null;
+    require(isSet(keytabInfo.folder),
+        "A valid destination keytab sub-folder name is required (e.g. 'security').\n"
+        + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
+
+    requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab);
+    File keytabFile = new File(keytabInfo.keytab);
+    require(keytabFile.isFile(),
+        "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath());
+    srcFile = new Path(keytabFile.toURI());
+
+    Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+    sfs.mkdirs(pkgPath);
+    sfs.setPermission(pkgPath, new FsPermission(
+        FsAction.ALL, FsAction.NONE, FsAction.NONE));
+
+    Path fileInFs = new Path(pkgPath, srcFile.getName());
+    log.info("Installing keytab {} at {} and overwrite is {}.",
+        srcFile, fileInFs, keytabInfo.overwrite);
+    require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite),
+        "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
+
+    sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs);
+    sfs.setPermission(fileInFs,
+        new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
+      throws YarnException, IOException {
+    log.warn("The 'install-keytab' option has been deprecated.  Please use 'keytab --install'.");
+    return actionKeytab(new ActionKeytabArgs(installKeytabInfo));
+  }
+
+  @Override
+  public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
+      YarnException,
+      IOException {
+    log.warn("The " + ACTION_INSTALL_PACKAGE
+        + " option has been deprecated. Please use '"
+        + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'.");
+    if (StringUtils.isEmpty(installPkgInfo.name)) {
+      throw new BadCommandArgumentsException(
+          E_INVALID_APPLICATION_TYPE_NAME + "\n"
+              + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
+    }
+    Path srcFile = extractPackagePath(installPkgInfo.packageURI);
+
+    // Do not provide new options to install-package command as it is in
+    // deprecated mode. So version is kept null here. Use package --install.
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name,
+        null);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+    sfs.mkdirs(pkgPath);
+
+    Path fileInFs = new Path(pkgPath, srcFile.getName());
+    log.info("Installing package {} at {} and overwrite is {}.",
+        srcFile, fileInFs, installPkgInfo.replacePkg);
+    require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg),
+          "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE);
+    sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionResource(ActionResourceArgs resourceInfo)
+      throws YarnException, IOException {
+    if (resourceInfo.help) {
+      actionHelp(ACTION_RESOURCE);
+      return EXIT_SUCCESS;
+    } else if (resourceInfo.install) {
+      return actionInstallResource(resourceInfo);
+    } else if (resourceInfo.delete) {
+      return actionDeleteResource(resourceInfo);
+    } else if (resourceInfo.list) {
+      return actionListResource(resourceInfo);
+    } else {
+      throw new BadCommandArgumentsException(
+          "Resource option specified not found.\n"
+              + CommonArgs.usage(serviceArgs, ACTION_RESOURCE));
+    }
+  }
+
+  private int actionListResource(ActionResourceArgs resourceInfo) throws IOException {
+    String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
+    Path path = sliderFileSystem.buildResourcePath(folder);
+    RemoteIterator<LocatedFileStatus> files =
+        sliderFileSystem.getFileSystem().listFiles(path, true);
+    log.info("Resources:");
+    while (files.hasNext()) {
+      log.info("\t" + files.next().getPath().toString());
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  private int actionDeleteResource(ActionResourceArgs resourceInfo)
+      throws BadCommandArgumentsException, IOException {
+    if (StringUtils.isEmpty(resourceInfo.resource)) {
+      throw new BadCommandArgumentsException("A file name is required.");
+    }
+
+    Path fileInFs;
+    if (resourceInfo.folder == null) {
+      fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource);
+    } else {
+      fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder,
+          resourceInfo.resource);
+    }
+
+    log.info("Deleting resource {}", fileInFs);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+    require(sfs.exists(fileInFs), "No resource to delete found at %s", fileInFs.toUri());
+    sfs.delete(fileInFs, true);
+
+    return EXIT_SUCCESS;
+  }
+
+  private int actionInstallResource(ActionResourceArgs resourceInfo)
+      throws BadCommandArgumentsException, IOException {
+    Path srcFile = null;
+    String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY;
+
+    requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource);
+    File file = new File(resourceInfo.resource);
+    require(file.isFile() || file.isDirectory(),
+        "Unable to access supplied file at %s", file.getAbsolutePath());
+
+    File[] files;
+    if (file.isDirectory()) {
+      files = file.listFiles();
+    } else {
+      files = new File[] { file };
+    }
+
+    Path pkgPath = sliderFileSystem.buildResourcePath(folder);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+
+    if (!sfs.exists(pkgPath)) {
+      sfs.mkdirs(pkgPath);
+      sfs.setPermission(pkgPath, new FsPermission(
+          FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    } else {
+      require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " +
+          "not a directory", folder);
+    }
+
+    for (File f : files) {
+      srcFile = new Path(f.toURI());
+
+      Path fileInFs = new Path(pkgPath, srcFile.getName());
+      log.info("Installing file {} at {} and overwrite is {}.",
+          srcFile, fileInFs, resourceInfo.overwrite);
+      require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
+          "File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
+
+      sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
+      sfs.setPermission(fileInFs,
+          new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionClient(ActionClientArgs clientInfo) throws
+      YarnException,
+      IOException {
+    if (clientInfo.install) {
+      return doClientInstall(clientInfo);
+    } else if (clientInfo.getCertStore) {
+      return doCertificateStoreRetrieval(clientInfo);
+    } else {
+      throw new BadCommandArgumentsException(
+          "Only install, keystore, and truststore commands are supported for the client.\n"
+          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+
+    }
+  }
+
+  private int doCertificateStoreRetrieval(ActionClientArgs clientInfo)
+      throws YarnException, IOException {
+    if (clientInfo.keystore != null && clientInfo.truststore != null) {
+      throw new BadCommandArgumentsException(
+          "Only one of either keystore or truststore can be retrieved at one time.  "
+          + "Retrieval of both should be done separately\n"
+          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+    }
+
+    requireArgumentSet(Arguments.ARG_NAME, clientInfo.name);
+
+    File storeFile = null;
+    SecurityStore.StoreType type;
+    if (clientInfo.keystore != null) {
+      storeFile = clientInfo.keystore;
+      type = SecurityStore.StoreType.keystore;
+    } else {
+      storeFile = clientInfo.truststore;
+      type = SecurityStore.StoreType.truststore;
+    }
+
+    require (!storeFile.exists(),
+        "File %s already exists.  Please remove that file or select a different file name.",
+         storeFile.getAbsolutePath());
+    String hostname = null;
+    if (type == SecurityStore.StoreType.keystore) {
+      hostname = clientInfo.hostname;
+      if (hostname == null) {
+        hostname = InetAddress.getLocalHost().getCanonicalHostName();
+        log.info("No hostname specified via command line. Using {}", hostname);
+      }
+    }
+
+    String password = clientInfo.password;
+    if (password == null) {
+      String provider = clientInfo.provider;
+      String alias = clientInfo.alias;
+      if (provider != null && alias != null) {
+        Configuration conf = new Configuration(getConfig());
+        conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+        char[] chars = conf.getPassword(alias);
+        if (chars == null) {
+          CredentialProvider credentialProvider =
+              CredentialProviderFactory.getProviders(conf).get(0);
+          chars = readOnePassword(alias);
+          credentialProvider.createCredentialEntry(alias, chars);
+          credentialProvider.flush();
+        }
+        password = String.valueOf(chars);
+        Arrays.fill(chars, ' ');
+      } else {
+        log.info("No password and no provider/alias pair were provided, " +
+            "prompting for password");
+        // get a password
+        password = String.valueOf(readOnePassword(type.name()));
+      }
+    }
+
+    byte[] keystore = createClusterOperations(clientInfo.name)
+        .getClientCertificateStore(hostname, "client", password, type.name());
+    // persist to file
+    FileOutputStream storeFileOutputStream = null;
+    try {
+      storeFileOutputStream = new FileOutputStream(storeFile);
+      IOUtils.write(keystore, storeFileOutputStream);
+    } catch (Exception e) {
+      log.error("Unable to persist to file {}", storeFile);
+      throw e;
+    } finally {
+      if (storeFileOutputStream != null) {
+        storeFileOutputStream.close();
+      }
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  private int doClientInstall(ActionClientArgs clientInfo)
+      throws IOException, SliderException {
+
+    require(clientInfo.installLocation != null,
+          E_INVALID_INSTALL_LOCATION +"\n"
+          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
+    require(clientInfo.installLocation.exists(),
+        E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath());
+
+    require(clientInfo.installLocation.isDirectory(),
+        E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath());
+
+    File pkgFile;
+    File tmpDir = null;
+
+    require(isSet(clientInfo.packageURI) || isSet(clientInfo.name),
+        E_INVALID_APPLICATION_PACKAGE_LOCATION);
+    if (isSet(clientInfo.packageURI)) {
+      pkgFile = new File(clientInfo.packageURI);
+    } else {
+      Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name);
+      Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG);
+      require(sliderFileSystem.isFile(appDefPath),
+          E_INVALID_APPLICATION_PACKAGE_LOCATION);
+      tmpDir = Files.createTempDir();
+      pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG);
+      sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile);
+    }
+    require(pkgFile.isFile(),
+        E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath());
+
+    JSONObject config = null;
+    if(clientInfo.clientConfig != null) {
+      try {
+        byte[] encoded = Files.toByteArray(clientInfo.clientConfig);
+        config = new JSONObject(new String(encoded, Charset.defaultCharset()));
+      } catch (JSONException jsonEx) {
+        log.error("Unable to read supplied configuration at {}: {}",
+            clientInfo.clientConfig, jsonEx);
+        log.debug("Unable to read supplied configuration at {}: {}",
+            clientInfo.clientConfig, jsonEx, jsonEx);
+        throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx);
+      }
+    }
+
+    // Only INSTALL is supported
+    AbstractClientProvider
+        provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE);
+    provider.processClientOperation(sliderFileSystem,
+        getRegistryOperations(),
+        getConfig(),
+        "INSTALL",
+        clientInfo.installLocation,
+        pkgFile,
+        config,
+        clientInfo.name);
+    return EXIT_SUCCESS;
+  }
+
+
+  @Override
+  public int actionPackage(ActionPackageArgs actionPackageInfo)
+      throws YarnException, IOException {
+    initializeOutputStream(actionPackageInfo.out);
+    int exitCode = -1;
+    if (actionPackageInfo.help) {
+      exitCode = actionHelp(ACTION_PACKAGE);
+    }
+    if (actionPackageInfo.install) {
+      exitCode = actionPackageInstall(actionPackageInfo);
+    }
+    if (actionPackageInfo.delete) {
+      exitCode = actionPackageDelete(actionPackageInfo);
+    }
+    if (actionPackageInfo.list) {
+      exitCode = actionPackageList();
+    }
+    if (actionPackageInfo.instances) {
+      exitCode = actionPackageInstances();
+    }
+    finalizeOutputStream(actionPackageInfo.out);
+    if (exitCode != -1) {
+      return exitCode;
+    }
+    throw new BadCommandArgumentsException(
+        "Select valid package operation option");
+  }
+
+  private void initializeOutputStream(String outFile)
+      throws FileNotFoundException {
+    if (outFile != null) {
+      clientOutputStream = new PrintStream(new FileOutputStream(outFile));
+    } else {
+      clientOutputStream = System.out;
+    }
+  }
+
+  private void finalizeOutputStream(String outFile) {
+    if (outFile != null && clientOutputStream != null) {
+      clientOutputStream.flush();
+      clientOutputStream.close();
+    }
+    clientOutputStream = System.out;
+  }
+
+  private int actionPackageInstances() throws YarnException, IOException {
+    Map<String, Path> persistentInstances = sliderFileSystem
+        .listPersistentInstances();
+    if (persistentInstances.isEmpty()) {
+      log.info("No slider cluster specification available");
+      return EXIT_SUCCESS;
+    }
+    String pkgPathValue = sliderFileSystem
+        .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri()
+        .getPath();
+    FileSystem fs = sliderFileSystem.getFileSystem();
+    Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances
+        .entrySet().iterator();
+    log.info("List of applications with its package name and path");
+    println("%-25s  %15s  %30s  %s", "Cluster Name", "Package Name",
+        "Package Version", "Application Location");
+    while(instanceItr.hasNext()) {
+      Map.Entry<String, Path> entry = instanceItr.next();
+      String clusterName = entry.getKey();
+      Path clusterPath = entry.getValue();
+      AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+          clusterName, clusterPath);
+      Path appDefPath = null;
+      try {
+        appDefPath = new Path(
+            getApplicationDefinitionPath(instanceDefinition
+                .getAppConfOperations()));
+      } catch (BadConfigException e) {
+        // Invalid cluster state, so move on to next. No need to log anything
+        // as this is just listing of instances.
+        continue;
+      }
+      if (!appDefPath.isUriPathAbsolute()) {
+        appDefPath = new Path(fs.getHomeDirectory(), appDefPath);
+      }
+      String appDefPathStr = appDefPath.toUri().toString();
+      try {
+        if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) {
+          String packageName = appDefPath.getParent().getName();
+          String packageVersion = StringUtils.EMPTY;
+          if (instanceDefinition.isVersioned()) {
+            packageVersion = packageName;
+            packageName = appDefPath.getParent().getParent().getName();
+          }
+          println("%-25s  %15s  %30s  %s", clusterName, packageName,
+              packageVersion, appDefPathStr);
+        }
+      } catch (IOException e) {
+        log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr);
+      }
+    }
+    return EXIT_SUCCESS;
+  }
+
+  private int actionPackageList() throws IOException {
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY,
+        StringUtils.EMPTY);
+    log.info("Package install path : {}", pkgPath);
+    FileSystem sfs = sliderFileSystem.getFileSystem();
+    if (!sfs.isDirectory(pkgPath)) {
+      log.info("No package(s) installed");
+      return EXIT_SUCCESS;
+    }
+    FileStatus[] fileStatus = sfs.listStatus(pkgPath);
+    boolean hasPackage = false;
+    StringBuilder sb = new StringBuilder();
+    sb.append("List of installed packages:\n");
+    for (FileStatus fstat : fileStatus) {
+      if (fstat.isDirectory()) {
+        sb.append("\t").append(fstat.getPath().getName());
+        sb.append("\n");
+        hasPackage = true;
+      }
+    }
+    if (hasPackage) {
+      println(sb.toString());
+    } else {
+      log.info("No package(s) installed");
+    }
+    return EXIT_SUCCESS;
+  }
+
+  private void createSummaryMetainfoFile(Path srcFile, Path destFile,
+      boolean overwrite) throws IOException {
+    FileSystem srcFs = srcFile.getFileSystem(getConfig());
+    try (InputStream inputStreamJson = SliderUtils
+        .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json");
+        InputStream inputStreamXml = SliderUtils
+            .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) {
+      InputStream inputStream = null;
+      Path summaryFileInFs = null;
+      if (inputStreamJson != null) {
+        inputStream = inputStreamJson;
+        summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
+            + ".metainfo.json");
+        log.info("Found JSON metainfo file in package");
+      } else if (inputStreamXml != null) {
+        inputStream = inputStreamXml;
+        summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
+            + ".metainfo.xml");
+        log.info("Found XML metainfo file in package");
+      }
+      if (inputStream != null) {
+        try (FSDataOutputStream dataOutputStream = sliderFileSystem
+            .getFileSystem().create(summaryFileInFs, overwrite)) {
+          log.info("Creating summary metainfo file");
+          IOUtils.copy(inputStream, dataOutputStream);
+        }
+      }
+    }
+  }
+
+  private int actionPackageInstall(ActionPackageArgs actionPackageArgs)
+      throws YarnException, IOException {
+    requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
+
+    Path srcFile = extractPackagePath(actionPackageArgs.packageURI);
+
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
+        actionPackageArgs.version);
+    FileSystem fs = sliderFileSystem.getFileSystem();
+    if (!fs.exists(pkgPath)) {
+      fs.mkdirs(pkgPath);
+    }
+
+    Path fileInFs = new Path(pkgPath, srcFile.getName());
+    require(actionPackageArgs.replacePkg || !fs.exists(fileInFs),
+        E_PACKAGE_EXISTS +" at  %s. Use --replacepkg to overwrite.", fileInFs.toUri());
+
+    log.info("Installing package {} to {} (overwrite set to {})", srcFile,
+        fileInFs, actionPackageArgs.replacePkg);
+    fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs);
+    createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg);
+
+    String destPathWithHomeDir = Path
+        .getPathWithoutSchemeAndAuthority(fileInFs).toString();
+    String destHomeDir = Path.getPathWithoutSchemeAndAuthority(
+        fs.getHomeDirectory()).toString();
+    // a somewhat contrived approach to stripping out the home directory and any trailing
+    // separator; designed to work on windows and unix
+    String destPathWithoutHomeDir;
+    if (destPathWithHomeDir.startsWith(destHomeDir)) {
+      destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length());
+      if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) {
+        destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1);
+      }
+    } else {
+      destPathWithoutHomeDir = destPathWithHomeDir;
+    }
+    log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}",
+        destPathWithoutHomeDir);
+
+    return EXIT_SUCCESS;
+  }
+
+  private Path extractPackagePath(String packageURI)
+      throws BadCommandArgumentsException {
+    require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION);
+    File pkgFile = new File(packageURI);
+    require(pkgFile.isFile(),
+        E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ":  " + pkgFile.getAbsolutePath());
+    return new Path(pkgFile.toURI());
+  }
+
+  private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws
+      YarnException, IOException {
+    requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
+
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
+        actionPackageArgs.version);
+    FileSystem fs = sliderFileSystem.getFileSystem();
+    require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri());
+    log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath);
+
+    if(fs.delete(pkgPath, true)) {
+      log.info("Deleted package {} " + actionPackageArgs.name);
+      return EXIT_SUCCESS;
+    } else {
+      log.warn("Package deletion failed.");
+      return EXIT_NOT_FOUND;
+    }
+  }
+
+  @Override
+  public int actionUpdate(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+      YarnException, IOException {
+    buildInstanceDefinition(clustername, buildInfo, true, true);
+    return EXIT_SUCCESS; 
+  }
+
+  /**
+   * Build up the AggregateConfiguration for an application instance then
+   * persists it
+   * @param clustername name of the cluster
+   * @param buildInfo the arguments needed to build the cluster
+   * @param overwrite true if existing cluster directory can be overwritten
+   * @param liveClusterAllowed true if live cluster can be modified
+   * @throws YarnException
+   * @throws IOException
+   */
+
+  public void buildInstanceDefinition(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
+      boolean liveClusterAllowed) throws YarnException, IOException {
+    buildInstanceDefinition(clustername, buildInfo, overwrite,
+        liveClusterAllowed, false);
+  }
+
+  public void buildInstanceDefinition(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
+      boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
+      IOException {
+    // verify that a live cluster isn't there
+    validateClusterName(clustername);
+    verifyBindingsDefined();
+    if (!liveClusterAllowed) {
+      verifyNoLiveClusters(clustername, "Create");
+    }
+
+    Configuration conf = getConfig();
+    String registryQuorum = lookupZKQuorum();
+
+    Path appconfdir = buildInfo.getConfdir();
+    // Provider
+    String providerName = buildInfo.getProvider();
+    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
+    log.debug("Provider is {}", providerName);
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
+    AbstractClientProvider provider =
+      createClientProvider(providerName);
+    InstanceBuilder builder =
+      new InstanceBuilder(sliderFileSystem, 
+                          getConfig(),
+                          clustername);
+    
+    AggregateConf instanceDefinition = new AggregateConf();
+    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
+    //initial definition is set by the providers 
+    sliderAM.prepareInstanceConfiguration(instanceDefinition);
+    provider.prepareInstanceConfiguration(instanceDefinition);
+
+    //load in any specified on the command line
+    if (buildInfo.resources != null) {
+      try {
+        resources.mergeFile(buildInfo.resources,
+                            new ResourcesInputPropertiesValidator());
+
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+               "incorrect argument to %s: \"%s\" : %s ", 
+                                     Arguments.ARG_RESOURCES,
+                                     buildInfo.resources,
+                                     e.toString());
+      }
+    }
+    if (buildInfo.template != null) {
+      try {
+        appConf.mergeFile(buildInfo.template,
+                          new TemplateInputPropertiesValidator());
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+                                     "incorrect argument to %s: \"%s\" : %s ",
+                                     Arguments.ARG_TEMPLATE,
+                                     buildInfo.template,
+                                     e.toString());
+      }
+    }
+
+    if (isUpgradeFlow) {
+      ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
+      if (!upgradeInfo.force) {
+        validateClientAndClusterResource(clustername, resources);
+      }
+    }
+
+    //get the command line options
+    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
+    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
+
+    appConf.merge(cmdLineAppOptions);
+
+    AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem);
+    appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf);
+
+    // put the role counts into the resources file
+    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
+    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
+      String count = roleEntry.getValue();
+      String key = roleEntry.getKey();
+      log.info("{} => {}", key, count);
+      resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
+    }
+
+    //all CLI role options
+    Map<String, Map<String, String>> appOptionMap =
+      buildInfo.getCompOptionMap();
+    appConf.mergeComponents(appOptionMap);
+
+    //internal picks up core. values only
+    internal.propagateGlobalKeys(appConf, "slider.");
+    internal.propagateGlobalKeys(appConf, "internal.");
+
+    //copy over role. and yarn. values ONLY to the resources
+    if (PROPAGATE_RESOURCE_OPTION) {
+      resources.propagateGlobalKeys(appConf, "component.");
+      resources.propagateGlobalKeys(appConf, "role.");
+      resources.propagateGlobalKeys(appConf, "yarn.");
+      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
+    }
+
+    // resource component args
+    appConf.merge(cmdLineResourceOptions);
+    resources.merge(cmdLineResourceOptions);
+    resources.mergeComponents(buildInfo.getResourceCompOptionMap());
+
+    builder.init(providerName, instanceDefinition);
+    builder.propagateFilename();
+    builder.propagatePrincipals();
+    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
+                                       buildInfo.getAppHomeDir());
+    builder.setQueue(buildInfo.queue);
+
+    String quorum = buildInfo.getZKhosts();
+    if (isUnset(quorum)) {
+      quorum = registryQuorum;
+    }
+    if (isUnset(quorum)) {
+      throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
+    }
+    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
+        getUsername(),
+        clustername,
+        registryQuorum,
+        quorum);
+    String zookeeperRoot = buildInfo.getAppZKPath();
+
+    if (isSet(zookeeperRoot)) {
+      zkPaths.setAppPath(zookeeperRoot);
+    } else {
+      String createDefaultZkNode = appConf.getGlobalOptions()
+          .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+      if (createDefaultZkNode.equals("true")) {
+        String defaultZKPath = createZookeeperNode(clustername, false);
+        log.debug("ZK node created for application instance: {}", defaultZKPath);
+        if (defaultZKPath != null) {
+          zkPaths.setAppPath(defaultZKPath);
+        }
+      } else {
+        // create AppPath if default is being used
+        String defaultZKPath = createZookeeperNode(clustername, true);
+        log.debug("ZK node assigned to application instance: {}", defaultZKPath);
+        zkPaths.setAppPath(defaultZKPath);
+      }
+    }
+
+    builder.addZKBinding(zkPaths);
+
+    //then propagate any package URI
+    if (buildInfo.packageURI != null) {
+      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
+    }
+
+    propagatePythonExecutable(conf, instanceDefinition);
+
+    // make any substitutions needed at this stage
+    replaceTokens(appConf.getConfTree(), getUsername(), clustername);
+
+    // TODO: Refactor the validation code and persistence code
+    try {
+      persistInstanceDefinition(overwrite, appconfdir, builder);
+      appDefinitionPersister.persistPackages();
+
+    } catch (LockAcquireFailedException e) {
+      log.warn("Failed to get a Lock on {} : {}", builder, e, e);
+      throw new BadClusterStateException("Failed to save " + clustername
+                                         + ": " + e);
+    }
+
+    // providers to validate what there is
+    // TODO: Validation should be done before persistence
+    AggregateConf instanceDescription = builder.getInstanceDescription();
+    validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem);
+    validateInstanceDefinition(provider, instanceDescription, sliderFileSystem);
+  }
+
+  private void validateClientAndClusterResource(String clustername,
+      ConfTreeOperations clientResources) throws BadClusterStateException,
+      SliderException, IOException {
+    log.info("Validating upgrade resource definition with current cluster "
+        + "state (components and instance count)");
+    Map<String, Integer> clientComponentInstances = new HashMap<>();
+    for (String componentName : clientResources.getComponentNames()) {
+      if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
+        clientComponentInstances.put(componentName, clientResources
+            .getComponentOptInt(componentName,
+                COMPONENT_INSTANCES, -1));
+      }
+    }
+
+    AggregateConf clusterConf = null;
+    try {
+      clusterConf = loadPersistedClusterDescription(clustername);
+    } catch (LockAcquireFailedException e) {
+      log.warn("Failed to get a Lock on cluster resource : {}", e, e);
+      throw new BadClusterStateException(
+          "Failed to load client resource definition " + clustername + ": " + e, e);
+    }
+    Map<String, Integer> clusterComponentInstances = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> component : clusterConf
+        .getResources().components.entrySet()) {
+      if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
+        clusterComponentInstances.put(
+            component.getKey(),
+            Integer.decode(component.getValue().get(
+                COMPONENT_INSTANCES)));
+      }
+    }
+
+    // client and cluster should be an exact match
+    Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances
+        .entrySet().iterator();
+    while (clientComponentInstanceIt.hasNext()) {
+      Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next();
+      if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
+        // compare instance count now and remove from both maps if they match
+        if (clusterComponentInstances
+            .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry
+            .getValue().intValue()) {
+          clusterComponentInstances.remove(clientComponentInstanceEntry
+              .getKey());
+          clientComponentInstanceIt.remove();
+        }
+      }
+    }
+
+    if (!clientComponentInstances.isEmpty()
+        || !clusterComponentInstances.isEmpty()) {
+      log.error("Mismatch found in upgrade resource definition and cluster "
+          + "resource state");
+      if (!clientComponentInstances.isEmpty()) {
+        log.info("The upgrade resource definitions that do not match are:");
+        for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances
+            .entrySet()) {
+          log.info("    Component Name: {}, Instance count: {}",
+              clientComponentInstanceEntry.getKey(),
+              clientComponentInstanceEntry.getValue());
+        }
+      }
+      if (!clusterComponentInstances.isEmpty()) {
+        log.info("The cluster resources that do not match are:");
+        for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances
+            .entrySet()) {
+          log.info("    Component Name: {}, Instance count: {}",
+              clusterComponentInstanceEntry.getKey(),
+              clusterComponentInstanceEntry.getValue());
+        }
+      }
+      throw new BadConfigException("Resource definition provided for "
+          + "upgrade does not match with that of the currently running "
+          + "cluster.\nIf you are aware of what you are doing, rerun the "
+          + "command with " + Arguments.ARG_FORCE + " option.");
+    }
+  }
+
+  protected void persistInstanceDefinition(boolean overwrite,
+                                         Path appconfdir,
+                                         InstanceBuilder builder)
+      throws IOException, SliderException, LockAcquireFailedException {
+    builder.persist(appconfdir, overwrite);
+  }
+
+  @VisibleForTesting
+  public static void replaceTokens(ConfTree conf,
+      String userName, String clusterName) throws IOException {
+    Map<String,String> newglobal = new HashMap<>();
+    for (Entry<String,String> entry : conf.global.entrySet()) {
+      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
+          userName, clusterName));
+    }
+    conf.global.putAll(newglobal);
+
+    for (String component : conf.components.keySet()) {
+      Map<String,String> newComponent = new HashMap<>();
+      for (Entry<String,String> entry : conf.components.get(component).entrySet()) {
+        newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
+            userName, clusterName));
+      }
+      conf.components.get(component).putAll(newComponent);
+    }
+
+    Map<String,List<String>> newcred = new HashMap<>();
+    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
+      List<String> resultList = new ArrayList<>();
+      for (String v : entry.getValue()) {
+        resultList.add(replaceTokens(v, userName, clusterName));
+      }
+      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
+          resultList);
+    }
+    conf.credentials.clear();
+    conf.credentials.putAll(newcred);
+  }
+
+  private static String replaceTokens(String s, String userName,
+      String clusterName) throws IOException {
+    return s.replaceAll(Pattern.quote("${USER}"), userName)
+        .replaceAll(Pattern.quote("${USER_NAME}"), userName)
+        .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
+  }
+
+  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
+    String clusterDirPermsOct =
+      conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+    return new FsPermission(clusterDirPermsOct);
+  }
+
+  /**
+   * Verify that the Resource Manager is configured (on a non-HA cluster).
+   * with a useful error message
+   * @throws BadCommandArgumentsException the exception raised on an invalid config
+   */
+  public void verifyBindingsDefined() throws BadCommandArgumentsException {
+    InetSocketAddress rmAddr = getRmAddress(getConfig());
+    if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
+     && !isAddressDefined(rmAddr)) {
+      throw new BadCommandArgumentsException(
+        E_NO_RESOURCE_MANAGER
+        + " in the argument "
+        + Arguments.ARG_MANAGER
+        + " or the configuration property "
+        + YarnConfiguration.RM_ADDRESS 
+        + " value :" + rmAddr);
+    }
+  }
+
+  /**
+   * Load and start a cluster specification.
+   * This assumes that all validation of args and cluster state
+   * have already taken place
+   *
+   * @param clustername name of the cluster.
+   * @param launchArgs launch arguments
+   * @return the exit code
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected int startCluster(String clustername,
+                           LaunchArgsAccessor launchArgs) throws
+                                                          YarnException,
+                                                          IOException {
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      clustername,
+      clusterDirectory);
+
+    LaunchedApplication launchedApplication =
+      launchApplication(clustername, clusterDirectory, instanceDefinition,
+                        serviceArgs.isDebug());
+
+    if (launchArgs.getOutputFile() != null) {
+      // output file has been requested. Get the app report and serialize it
+      ApplicationReport report =
+          launchedApplication.getApplicationReport();
+      SerializedApplicationReport sar = new SerializedApplicationReport(report);
+      sar.submitTime = System.currentTimeMillis();
+      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
+      serDeser.save(sar, launchArgs.getOutputFile());
+    }
+    int waittime = launchArgs.getWaittime();
+    if (waittime > 0) {
+      return waitForAppRunning(launchedApplication, waittime, waittime);
+    } else {
+      // no waiting
+      return EXIT_SUCCESS;
+    }
+  }
+
+  /**
+   * Load the instance definition. It is not resolved at this point
+   * @param name cluster name
+   * @param clusterDirectory cluster dir
+   * @return the loaded configuration
+   * @throws IOException
+   * @throws SliderException
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+  public AggregateConf loadInstanceDefinitionUnresolved(String name,
+            Path clusterDirectory) throws IOException, SliderException {
+
+    try {
+      AggregateConf definition =
+        InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
+                                                    clusterDirectory);
+      definition.setName(name);
+      return definition;
+    } catch (FileNotFoundException e) {
+      throw UnknownApplicationInstanceException.unknownInstance(name, e);
+    }
+  }
+
+  /**
+   * Load the instance definition. 
+   * @param name cluster name
+   * @param resolved flag to indicate the cluster should be resolved
+   * @return the loaded configuration
+   * @throws IOException IO problems
+   * @throws SliderException slider explicit issues
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+    public AggregateConf loadInstanceDefinition(String name,
+        boolean resolved) throws
+        IOException,
+        SliderException {
+
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      name,
+      clusterDirectory);
+    if (resolved) {
+      instanceDefinition.resolve();
+    }
+    return instanceDefinition;
+
+  }
+
+  protected AppMasterLauncher setupAppMasterLauncher(String clustername,
+      Path clusterDirectory,
+      AggregateConf instanceDefinition,
+      boolean debugAM)
+    throws YarnException, IOException{
+    deployedClusterName = clustername;
+    validateClusterName(clustername);
+    verifyNoLiveClusters(clustername, "Launch");
+    Configuration config = getConfig();
+    lookupZKQuorum();
+    boolean clusterSecure = isHadoopClusterSecure(config);
+    //create the Slider AM provider -this helps set up the AM
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
+
+    instanceDefinition.resolve();
+    launchedInstanceDefinition = instanceDefinition;
+
+    ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations();
+    MapOperations internalOptions = internalOperations.getGlobalOptions();
+    ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations();
+    ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations();
+    Path generatedConfDirPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        INTERNAL_GENERATED_CONF_PATH));
+    Path snapshotConfPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        INTERNAL_SNAPSHOT_CONF_PATH));
+
+
+    // cluster Provider
+    AbstractClientProvider provider = createClientProvider(
+      internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
+    if (log.isDebugEnabled()) {
+      log.debug(instanceDefinition.toString());
+    }
+    MapOperations sliderAMResourceComponent =
+      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
+    MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions();
+
+    // add the tags if available
+    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
+        getApplicationDefinitionPath(appOperations));
+
+    Credentials credentials = null;
+    if (clusterSecure) {
+      // pick up oozie credentials
+      credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
+          config);
+      if (credentials == null) {
+        // nothing from oozie, so build up directly
+        credentials = new Credentials(
+            UserGroupInformation.getCurrentUser().getCredentials());
+        CredentialUtils.addRMRenewableFSDelegationTokens(config,
+            sliderFileSystem.getFileSystem(),
+            credentials);
+        CredentialUtils.addRMDelegationToken(yarnClient, credentials);
+
+      } else {
+        log.info("Using externally supplied credentials to launch AM");
+      }
+    }
+
+    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
+        SliderKeys.APP_TYPE,
+        config,
+        sliderFileSystem,
+        yarnClient,
+        clusterSecure,
+        sliderAMResourceComponent,
+        resourceGlobalOptions,
+        applicationTags,
+        credentials);
+
+    ApplicationId appId = amLauncher.getApplicationId();
+    // set the application name;
+    amLauncher.setKeepContainersOverRestarts(true);
+
+    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
+    amLauncher.setMaxAppAttempts(maxAppAttempts);
+
+    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
+    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
+        clustername,
+        appId.toString() + "/am");
+    String libdir = "lib";
+    Path libPath = new Path(tempPath, libdir);
+    sliderFileSystem.getFileSystem().mkdirs(libPath);
+    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath);
+ 
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of the local resources
+    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
+    
+    // look for the configuration directory named on the command line
+    boolean hasServerLog4jProperties = false;
+    Path remoteConfPath = null;
+    String relativeConfDir = null;
+    String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
+    if (isUnset(confdirProp)) {
+      log.debug("No local configuration directory provided as system property");
+    } else {
+      File confDir = new File(confdirProp);
+      if (!confDir.exists()) {
+        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
+                                     confDir);
+      }
+      Path localConfDirPath = createLocalPath(confDir);
+      remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR);
+      log.debug("Slider configuration directory is {}; remote to be {}", 
+          localConfDirPath, remoteConfPath);
+      copyDirectory(config, localConfDirPath, remoteConfPath, null);
+
+      File log4jserver =
+          new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+      hasServerLog4jProperties = log4jserver.isFile();
+    }
+    // the assumption here is that minimr cluster => this is a test run
+    // and the classpath can look after itself
+
+    boolean usingMiniMRCluster = getUsingMiniMRCluster();
+    if (!usingMiniMRCluster) {
+
+      log.debug("Destination is not a MiniYARNCluster -copying full classpath");
+
+      // insert conf dir first
+      if (remoteConfPath != null) {
+        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
+        Map<String, LocalResource> submittedConfDir =
+          sliderFileSystem.submitDirectory(remoteConfPath,
+                                         relativeConfDir);
+        mergeMaps(localResources, submittedConfDir);
+      }
+    }
+    // build up the configuration 
+    // IMPORTANT: it is only after this call that site configurations
+    // will be valid.
+
+    propagatePrincipals(config, instanceDefinition);
+    // validate security data
+
+/*
+    // turned off until tested
+    SecurityConfiguration securityConfiguration =
+        new SecurityConfiguration(config,
+            instanceDefinition, clustername);
+    
+*/
+    Configuration clientConfExtras = new Configuration(false);
+    // then build up the generated path.
+    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
+    copyDirectory(config, snapshotConfPath, generatedConfDirPath,
+        clusterPerms);
+
+
+    // standard AM resources
+    sliderAM.prepareAMAnd

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org