You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2017/08/14 21:13:52 UTC

[14/15] hadoop git commit: YARN-6903. Yarn-native-service framework core rewrite. Contributed by Jian He

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
new file mode 100644
index 0000000..bea2924
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
+import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.slider.api.RoleKeys;
+import org.apache.slider.api.ServiceApiConstants;
+import org.apache.slider.api.resource.Application;
+import org.apache.slider.api.resource.ConfigFile;
+import org.apache.hadoop.yarn.service.conf.SliderKeys;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.registry.info.CustomRegistryConstants;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.hadoop.yarn.service.provider.ProviderUtils;
+import org.apache.hadoop.yarn.service.metrics.ServiceMetrics;
+import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.compinstance.ComponentInstanceEventType;
+import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
+import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
+import static org.apache.slider.api.ServiceApiConstants.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+
+/**
+ *
+ */
+public class ServiceScheduler extends CompositeService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ServiceScheduler.class);
+  private Application app;
+
+  // component_name -> component
+  private final Map<String, Component> componentsByName =
+      new ConcurrentHashMap<>();
+
+  // id - > component
+  private final Map<Long, Component> componentsById =
+      new ConcurrentHashMap<>();
+
+  private final Map<ContainerId, ComponentInstance> liveInstances =
+      new ConcurrentHashMap<>();
+
+  private ServiceMetrics serviceMetrics;
+
+  private ServiceTimelinePublisher serviceTimelinePublisher;
+
+  // Global diagnostics that will be reported to RM on eRxit.
+  // The unit the number of characters. This will be limited to 64 * 1024
+  // characters.
+  private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
+
+  // A cache for loading config files from remote such as hdfs
+  public LoadingCache<ConfigFile, Object> configFileCache = null;
+
+  public ScheduledExecutorService executorService;
+  public Map<String, String> globalTokens = new HashMap<>();
+
+  private AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient;
+  private NMClientAsync nmClient;
+  private AsyncDispatcher dispatcher;
+  AsyncDispatcher compInstanceDispatcher;
+  private YarnRegistryViewForProviders yarnRegistryOperations;
+  private ServiceContext context;
+  private ContainerLaunchService containerLaunchService;
+
+  public ServiceScheduler(ServiceContext context) {
+    super(context.application.getName());
+    this.context = context;
+  }
+
+  public void buildInstance(ServiceContext context, Configuration configuration)
+      throws YarnException {
+    app = context.application;
+    executorService = Executors.newScheduledThreadPool(10);
+    RegistryOperations registryClient = RegistryOperationsFactory
+        .createInstance("ServiceScheduler", configuration);
+    addIfService(registryClient);
+
+    // register metrics
+    serviceMetrics = ServiceMetrics
+        .register(app.getName(), "Metrics for service");
+    serviceMetrics.tag("type", "Metrics type [component or service]", "service");
+    serviceMetrics.tag("appId", "Application id for service", app.getId());
+
+    amRMClient =
+        AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallback());
+    addIfService(amRMClient);
+
+    nmClient = NMClientAsync.createNMClientAsync(new NMClientCallback());
+    addIfService(nmClient);
+
+    dispatcher = new AsyncDispatcher("Component  dispatcher");
+    dispatcher.register(ComponentEventType.class,
+        new ComponentEventHandler());
+    dispatcher.setDrainEventsOnStop();
+    addIfService(dispatcher);
+
+    compInstanceDispatcher =
+        new AsyncDispatcher("CompInstance dispatcher");
+    compInstanceDispatcher.register(ComponentInstanceEventType.class,
+        new ComponentInstanceEventHandler());
+    addIfService(compInstanceDispatcher);
+    containerLaunchService = new ContainerLaunchService(context.fs);
+    addService(containerLaunchService);
+
+    if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
+      TimelineV2Client timelineClient = TimelineV2Client
+          .createTimelineClient(context.attemptId.getApplicationId());
+      amRMClient.registerTimelineV2Client(timelineClient);
+      serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
+      addService(serviceTimelinePublisher);
+      DefaultMetricsSystem.instance().register("ServiceMetricsSink",
+          "For processing metrics to ATS",
+          new ServiceMetricsSink(serviceTimelinePublisher));
+      LOG.info("Timeline v2 is enabled.");
+    }
+
+    yarnRegistryOperations =
+        new YarnRegistryViewForProviders(registryClient,
+            RegistryUtils.currentUser(), SliderKeys.APP_TYPE, app.getName(),
+            context.attemptId);
+    initGlobalTokensForSubstitute(context);
+    //substitute quicklinks
+    ProviderUtils.substituteMapWithTokens(app.getQuicklinks(), globalTokens);
+    createConfigFileCache(context.fs.getFileSystem());
+
+    createAllComponents();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    try {
+      buildInstance(context, conf);
+    } catch (YarnException e) {
+      throw new YarnRuntimeException(e);
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    LOG.info("Stopping service scheduler");
+
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
+
+    DefaultMetricsSystem.shutdown();
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      serviceTimelinePublisher.serviceAttemptUnregistered(context);
+    }
+    // Cleanup each component instance. no need to release containers as
+    // they will be automatically released by RM
+    for (ComponentInstance instance : liveInstances.values()) {
+      instance.cleanupRegistryAndCompHdfsDir();
+    }
+    String msg = diagnostics.toString()
+        + "Navigate to the failed component for more details.";
+    amRMClient
+        .unregisterApplicationMaster(FinalApplicationStatus.ENDED, msg, "");
+    LOG.info("Application " + app.getName()
+        + " unregistered with RM, with attemptId = " + context.attemptId
+        + ", diagnostics = " + diagnostics);
+    super.serviceStop();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+    InetSocketAddress bindAddress = context.clientAMService.getBindAddress();
+    RegisterApplicationMasterResponse response = amRMClient
+        .registerApplicationMaster(bindAddress.getHostName(),
+            bindAddress.getPort(), "N/A");
+    if (response.getClientToAMTokenMasterKey() != null
+        && response.getClientToAMTokenMasterKey().remaining() != 0) {
+      context.secretManager
+          .setMasterKey(response.getClientToAMTokenMasterKey().array());
+    }
+    registerServiceInstance(context.attemptId, app);
+
+    //TODO handle containers recover
+  }
+
+  private void recover() {
+
+  }
+
+  private void initGlobalTokensForSubstitute(ServiceContext context) {
+    // ZK
+    globalTokens.put(ServiceApiConstants.CLUSTER_ZK_QUORUM, getConfig()
+        .getTrimmed(KEY_REGISTRY_ZK_QUORUM, DEFAULT_REGISTRY_ZK_QUORUM));
+    String user = null;
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      LOG.error("Failed to get user.", e);
+    }
+    globalTokens
+        .put(SERVICE_ZK_PATH, ZKIntegration.mkClusterPath(user, app.getName()));
+
+    globalTokens.put(ServiceApiConstants.USER, user);
+    String dnsDomain = getConfig().getTrimmed(KEY_DNS_DOMAIN);
+    if (dnsDomain != null && !dnsDomain.isEmpty()) {
+      globalTokens.put(ServiceApiConstants.DOMAIN, dnsDomain);
+    }
+    // HDFS
+    String clusterFs = getConfig().getTrimmed(FS_DEFAULT_NAME_KEY);
+    if (clusterFs != null && !clusterFs.isEmpty()) {
+      globalTokens.put(ServiceApiConstants.CLUSTER_FS_URI, clusterFs);
+      globalTokens.put(ServiceApiConstants.CLUSTER_FS_HOST,
+          URI.create(clusterFs).getHost());
+    }
+    globalTokens.put(SERVICE_HDFS_DIR, context.serviceHdfsDir);
+    // service name
+    globalTokens.put(SERVICE_NAME_LC, app.getName().toLowerCase());
+    globalTokens.put(SERVICE_NAME, app.getName());
+  }
+
+  private void createConfigFileCache(final FileSystem fileSystem) {
+    this.configFileCache =
+        CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+            .build(new CacheLoader<ConfigFile, Object>() {
+              @Override public Object load(ConfigFile key) throws Exception {
+                switch (key.getType()) {
+                case HADOOP_XML:
+                  try (FSDataInputStream input = fileSystem
+                      .open(new Path(key.getSrcFile()))) {
+                    org.apache.hadoop.conf.Configuration confRead =
+                        new org.apache.hadoop.conf.Configuration(false);
+                    confRead.addResource(input);
+                    Map<String, String> map = new HashMap<>(confRead.size());
+                    for (Map.Entry<String, String> entry : confRead) {
+                      map.put(entry.getKey(), entry.getValue());
+                    }
+                    return map;
+                  }
+                case TEMPLATE:
+                  try (FSDataInputStream fileInput = fileSystem
+                      .open(new Path(key.getSrcFile()))) {
+                    return IOUtils.toString(fileInput);
+                  }
+                default:
+                  return null;
+                }
+              }
+            });
+    context.configCache = configFileCache;
+  }
+
+  private void registerServiceInstance(ApplicationAttemptId attemptId,
+      Application application) throws IOException {
+    LOG.info("Registering " + attemptId + ", " + application.getName()
+        + " into registry");
+    ServiceRecord serviceRecord = new ServiceRecord();
+    serviceRecord.set(YarnRegistryAttributes.YARN_ID,
+        attemptId.getApplicationId().toString());
+    serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+        PersistencePolicies.APPLICATION);
+    serviceRecord.description = "Slider Application Master";
+
+    serviceRecord.addExternalEndpoint(RegistryTypeUtils
+        .ipcEndpoint(CustomRegistryConstants.AM_IPC_PROTOCOL,
+            new InetSocketAddress(5000))); // FIXME
+
+    // set any provided attributes
+    setUserProvidedServiceRecordAttributes(application.getConfiguration(),
+        serviceRecord);
+
+    executorService.submit(new Runnable() {
+      @Override public void run() {
+        try {
+          yarnRegistryOperations.registerSelf(serviceRecord, true);
+          LOG.info("Registered service under {}; absolute path {}",
+              yarnRegistryOperations.getSelfRegistrationPath(),
+              yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
+          boolean isFirstAttempt = 1 == attemptId.getAttemptId();
+          // delete the children in case there are any and this is an AM startup.
+          // just to make sure everything underneath is purged
+          if (isFirstAttempt) {
+            yarnRegistryOperations.deleteChildren(
+                yarnRegistryOperations.getSelfRegistrationPath(), true);
+          }
+        } catch (IOException e) {
+          LOG.error(
+              "Failed to register app " + app.getName() + " in registry");
+        }
+      }
+    });
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      serviceTimelinePublisher.serviceAttemptRegistered(app);
+    }
+  }
+
+  private void setUserProvidedServiceRecordAttributes(
+      org.apache.slider.api.resource.Configuration conf, ServiceRecord record) {
+    String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX;
+    for (Map.Entry<String, String> entry : conf.getProperties().entrySet()) {
+      if (entry.getKey().startsWith(prefix)) {
+        String key = entry.getKey().substring(prefix.length() + 1);
+        record.set(key, entry.getValue().trim());
+      }
+    }
+  }
+
+  private void createAllComponents() {
+    long allocateId = 0;
+
+    // sort components by dependencies
+    Collection<org.apache.slider.api.resource.Component> sortedComponents =
+        ServiceApiUtil.sortByDependencies(app.getComponents());
+
+    for (org.apache.slider.api.resource.Component compSpec : sortedComponents) {
+      Component component = new Component(compSpec, allocateId, context);
+      componentsById.put(allocateId, component);
+      componentsByName.put(component.getName(), component);
+      allocateId++;
+
+      // Trigger the component without dependencies
+      if (component.areDependenciesReady()) {
+        ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX)
+            .setDesired(compSpec.getNumberOfContainers());
+        component.handle(event);
+      }
+    }
+  }
+
+  private final class ComponentEventHandler
+      implements EventHandler<ComponentEvent> {
+    @Override
+    public void handle(ComponentEvent event) {
+      Component component = componentsByName.get(event.getName());
+
+      if (component == null) {
+        LOG.error("No component exists for " + event.getName());
+        return;
+      }
+      try {
+        component.handle(event);
+      } catch (Throwable t) {
+        LOG.error("Error in handling event type " + event.getType()
+            + " for component " + event.getName(), t);
+      }
+    }
+  }
+
+  private final class ComponentInstanceEventHandler
+      implements EventHandler<ComponentInstanceEvent> {
+    @Override
+    public void handle(ComponentInstanceEvent event) {
+      ComponentInstance instance =
+          liveInstances.get(event.getContainerId());
+      if (instance == null) {
+        LOG.error("No component instance exists for " + event.getContainerId());
+        return;
+      }
+      try {
+        instance.handle(event);
+      } catch (Throwable t) {
+        LOG.error("Error in handling event type " + event.getType()
+            + " for component instance " + instance.getCompInstanceId(), t);
+      }
+    }
+  }
+
+  private class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      LOG.info(containers.size() + " containers allocated. ");
+      for (Container container : containers) {
+        Component comp = componentsById.get(container.getAllocationRequestId());
+        ComponentEvent event =
+            new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
+                .setContainer(container);
+        dispatcher.getEventHandler().handle(event);
+        LOG.info("[COMPONENT {}]: {} outstanding container requests.",
+            comp.getName(),
+            amRMClient.getMatchingRequests(container.getAllocationRequestId()).size());
+        // remove the corresponding request
+        Collection<AMRMClient.ContainerRequest> collection = amRMClient
+            .getMatchingRequests(container.getAllocationRequestId());
+        if (collection.iterator().hasNext()) {
+          AMRMClient.ContainerRequest request = collection.iterator().next();
+          amRMClient.removeContainerRequest(request);
+        }
+
+      }
+    }
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      for (ContainerStatus status : statuses) {
+        ContainerId containerId = status.getContainerId();
+        ComponentInstance instance = liveInstances.get(status.getContainerId());
+        if (instance == null) {
+          LOG.error(
+              "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
+              containerId, status.getExitStatus(), status.getDiagnostics());
+          return;
+        }
+        ComponentEvent event =
+            new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)
+                .setStatus(status).setInstance(instance);
+        dispatcher.getEventHandler().handle(event);
+      }
+    }
+
+    @Override
+    public void onContainersUpdated(List<UpdatedContainer> containers) {
+    }
+
+    @Override public void onShutdownRequest() {
+      //Was used for non-work-preserving restart in YARN, should be deprecated.
+    }
+
+    @Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
+      StringBuilder str = new StringBuilder();
+      str.append("Nodes updated info: ").append(System.lineSeparator());
+      for (NodeReport report : updatedNodes) {
+        str.append(report.getNodeId()).append(", state = ")
+            .append(report.getNodeState()).append(", healthDiagnostics = ")
+            .append(report.getHealthReport()).append(System.lineSeparator());
+      }
+      LOG.warn(str.toString());
+    }
+
+    @Override public float getProgress() {
+      // get running containers over desired containers
+      long total = 0;
+      for (org.apache.slider.api.resource.Component component : app
+          .getComponents()) {
+        total += component.getNumberOfContainers();
+      }
+      // Probably due to user flexed down to 0
+      if (total == 0) {
+        return 100;
+      }
+      return Math.max((float) liveInstances.size() / total * 100, 100);
+    }
+
+    @Override public void onError(Throwable e) {
+      LOG.error("Error in AMRMClient callback handler ", e);
+    }
+  }
+
+
+  private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
+
+    @Override public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for " + containerId);
+        return;
+      }
+      ComponentEvent event =
+          new ComponentEvent(instance.getCompName(), CONTAINER_STARTED)
+              .setInstance(instance);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+
+    }
+
+    @Override public void onContainerStopped(ContainerId containerId) {
+
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for " + containerId);
+        return;
+      }
+      amRMClient.releaseAssignedContainer(containerId);
+      // After container released, it'll get CONTAINER_COMPLETED event from RM
+      // automatically which will trigger stopping COMPONENT INSTANCE
+    }
+
+    @Override public void onContainerResourceIncreased(ContainerId containerId,
+        Resource resource) {
+
+    }
+
+    @Override public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+
+    }
+
+    @Override
+    public void onIncreaseContainerResourceError(ContainerId containerId,
+        Throwable t) {
+
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+
+    }
+  }
+
+  public ServiceMetrics getServiceMetrics() {
+    return serviceMetrics;
+  }
+
+  public AMRMClientAsync<AMRMClient.ContainerRequest> getAmRMClient() {
+    return amRMClient;
+  }
+
+  public NMClientAsync getNmClient() {
+    return nmClient;
+  }
+
+  public void addLiveCompInstance(ContainerId containerId,
+      ComponentInstance instance) {
+    liveInstances.put(containerId, instance);
+  }
+
+  public void removeLiveCompInstance(ContainerId containerId) {
+    liveInstances.remove(containerId);
+  }
+
+  public AsyncDispatcher getCompInstanceDispatcher() {
+    return compInstanceDispatcher;
+  }
+
+  public YarnRegistryViewForProviders getYarnRegistryOperations() {
+    return yarnRegistryOperations;
+  }
+
+  public ServiceTimelinePublisher getServiceTimelinePublisher() {
+    return serviceTimelinePublisher;
+  }
+
+  public Map<ContainerId, ComponentInstance> getLiveInstances() {
+    return liveInstances;
+  }
+
+  public ContainerLaunchService getContainerLaunchService() {
+    return containerLaunchService;
+  }
+
+  public ServiceContext getContext() {
+    return context;
+  }
+
+  public Map<String, Component> getAllComponents() {
+    return componentsByName;
+  }
+
+  public Application getApp() {
+    return app;
+  }
+
+  public AsyncDispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  public BoundedAppender getDiagnostics() {
+    return diagnostics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java
new file mode 100644
index 0000000..dbc1f51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ClientAMProxy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.client.ServerProxy;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConfKeys;
+
+import java.net.InetSocketAddress;
+
+public class ClientAMProxy extends ServerProxy{
+
+  public static <T> T createProxy(final Configuration conf,
+      final Class<T> protocol, final UserGroupInformation ugi,
+      final YarnRPC rpc, final InetSocketAddress serverAddress) {
+
+    RetryPolicy retryPolicy =
+        createRetryPolicy(conf, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_WAIT_MS,
+            15 * 60 * 1000, YarnServiceConfKeys.CLIENT_AM_RETRY_MAX_INTERVAL_MS,
+            2 * 1000);
+    Configuration confClone = new Configuration(conf);
+    confClone.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    confClone.setInt(CommonConfigurationKeysPublic.
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
+    return createRetriableProxy(confClone, protocol, ugi, rpc, serverAddress,
+        retryPolicy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceCLI.java
new file mode 100644
index 0000000..5574ebd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceCLI.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.slider.api.resource.Application;
+import org.apache.hadoop.yarn.service.client.params.ClientArgs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.service.client.params.SliderActions.*;
+
+public class ServiceCLI {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ServiceClient.class);
+  protected ServiceClient client;
+
+  public int exec(ClientArgs args) throws Throwable {
+    if (StringUtils.isEmpty(args.getAction())) {
+      System.out.println(args.usage());
+      return -1;
+    }
+    switch (args.getAction()) {
+    case ACTION_BUILD: // Upload app json onto hdfs
+      client.actionBuild(args.getActionBuildArgs());
+      break;
+    case ACTION_START: // start the app with the pre-uploaded app json on hdfs
+      client.actionStart(args.getClusterName());
+      break;
+    case ACTION_CREATE: // create == build + start
+      client.actionCreate(args.getActionCreateArgs());
+      break;
+    case ACTION_STATUS:
+      Application app = client.getStatus(args.getClusterName());
+      System.out.println(app);
+      break;
+    case ACTION_FLEX:
+      client.actionFlexByCLI(args);
+      break;
+    case ACTION_STOP:
+      client.actionStop(args.getClusterName());
+      break;
+    case ACTION_DESTROY: // Destroy can happen only if app is already stopped
+      client.actionDestroy(args.getClusterName());
+      break;
+    case ACTION_DEPENDENCY: // upload dependency jars
+      client.actionDependency(args.getActionDependencyArgs());
+      break;
+    case ACTION_UPDATE:
+      client.updateLifetime(args.getClusterName(),
+          args.getActionUpdateArgs().lifetime);
+      break;
+    case ACTION_HELP:
+      LOG.info(args.usage());
+      break;
+    default:
+      LOG.info("NOT IMPLEMENTED: " + args.getAction());
+      LOG.info(args.usage());
+      return -1;
+    }
+    return 0;
+  }
+
+  public ServiceCLI() {
+    createServiceClient();
+  }
+
+  protected void createServiceClient() {
+    client = new ServiceClient();
+    client.init(new YarnConfiguration());
+    client.start();
+  }
+
+  public static void main(String[] args) throws Throwable {
+    ClientArgs clientArgs = new ClientArgs(args);
+    clientArgs.parse();
+    ServiceCLI cli =  new ServiceCLI();
+    int res = cli.exec(clientArgs);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
new file mode 100644
index 0000000..3d02603
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -0,0 +1,836 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
+import org.apache.hadoop.yarn.service.ClientAMProtocol;
+import org.apache.hadoop.yarn.service.ServiceMaster;
+import org.apache.hadoop.yarn.service.client.params.ActionDependencyArgs;
+import org.apache.hadoop.yarn.service.client.params.ActionFlexArgs;
+import org.apache.hadoop.yarn.service.client.params.Arguments;
+import org.apache.hadoop.yarn.service.client.params.ClientArgs;
+import org.apache.hadoop.yarn.service.client.params.CommonArgs;
+import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
+import org.apache.hadoop.yarn.service.conf.SliderKeys;
+import org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys;
+import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
+import org.apache.hadoop.yarn.service.provider.ProviderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.slider.api.resource.Application;
+import org.apache.slider.api.resource.Component;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UsageException;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZookeeperUtils;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
+import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_CREATE;
+import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_FLEX;
+import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
+import static org.apache.slider.common.tools.SliderUtils.*;
+
+public class ServiceClient extends CompositeService
+    implements SliderExitCodes, SliderKeys {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ServiceClient.class);
+  private SliderFileSystem fs;
+  private YarnClient yarnClient;
+  // Avoid looking up applicationId from fs all the time.
+  private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
+  private RegistryOperations registryClient;
+  private CuratorFramework curatorClient;
+  private YarnRPC rpc;
+
+  private static EnumSet<YarnApplicationState> terminatedStates =
+      EnumSet.of(FINISHED, FAILED, KILLED);
+  private static EnumSet<YarnApplicationState> liveStates =
+      EnumSet.of(NEW, NEW_SAVING, SUBMITTED, RUNNING);
+
+  public ServiceClient() {
+    super(ServiceClient.class.getName());
+  }
+
+  @Override protected void serviceInit(Configuration configuration)
+      throws Exception {
+    fs = new SliderFileSystem(configuration);
+    yarnClient = YarnClient.createYarnClient();
+    rpc = YarnRPC.create(configuration);
+    addService(yarnClient);
+    super.serviceInit(configuration);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (registryClient != null) {
+      registryClient.stop();
+    }
+    super.serviceStop();
+  }
+
+  private Application loadAppJsonFromLocalFS(
+      AbstractClusterBuildingActionArgs args) throws IOException {
+    File file = args.getAppDef();
+    Path filePath = new Path(file.getAbsolutePath());
+    LOG.info("Loading app json from: " + filePath);
+    Application application = ServiceApiUtil.jsonSerDeser
+        .load(FileSystem.getLocal(getConfig()), filePath);
+    if (args.lifetime > 0) {
+      application.setLifetime(args.lifetime);
+    }
+    application.setName(args.getClusterName());
+    return application;
+  }
+
+  public int actionBuild(AbstractClusterBuildingActionArgs args)
+      throws IOException, YarnException {
+    return actionBuild(loadAppJsonFromLocalFS(args));
+  }
+
+  public int actionBuild(Application application)
+      throws YarnException, IOException {
+    Path appDir = checkAppNotExistOnHdfs(application);
+    ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
+    createDirAndPersistApp(appDir, application);
+    return EXIT_SUCCESS;
+  }
+
+  public int actionCreate(AbstractClusterBuildingActionArgs args)
+      throws IOException, YarnException {
+    actionCreate(loadAppJsonFromLocalFS(args));
+    return EXIT_SUCCESS;
+  }
+
+  public ApplicationId actionCreate(Application application)
+      throws IOException, YarnException {
+    String appName = application.getName();
+    validateClusterName(appName);
+    ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
+    verifyNoLiveAppInRM(appName, "create");
+    Path appDir = checkAppNotExistOnHdfs(application);
+
+    // Write the definition first and then submit - AM will read the definition
+    createDirAndPersistApp(appDir, application);
+    ApplicationId appId = submitApp(application);
+    cachedAppIds.put(appName, appId);
+    application.setId(appId.toString());
+    // update app definition with appId
+    persistAppDef(appDir, application);
+    return appId;
+  }
+
+  // Called by ServiceCLI
+  protected int actionFlexByCLI(ClientArgs args)
+      throws YarnException, IOException {
+    ActionFlexArgs flexArgs = args.getActionFlexArgs();
+    Map<String, Long> componentCounts =
+        new HashMap<>(flexArgs.getComponentMap().size());
+    Application persistedApp =
+        ServiceApiUtil.loadApplication(fs, flexArgs.getClusterName());
+    if (!StringUtils.isEmpty(persistedApp.getId())) {
+      cachedAppIds.put(persistedApp.getName(),
+          ApplicationId.fromString(persistedApp.getId()));
+    }
+    for (Map.Entry<String, String> entry : flexArgs.getComponentMap()
+        .entrySet()) {
+      String compName = entry.getKey();
+      ServiceApiUtil.validateCompName(compName);
+      Component component = persistedApp.getComponent(compName);
+      if (component == null) {
+        throw new IllegalArgumentException(entry.getKey() + " does not exist !");
+      }
+      long numberOfContainers =
+          parseNumberOfContainers(component, entry.getValue());
+      componentCounts.put(compName, numberOfContainers);
+    }
+    // throw usage exception if no changes proposed
+    if (componentCounts.size() == 0) {
+      actionHelp(ACTION_FLEX, args);
+    }
+    flexComponents(args.getClusterName(), componentCounts, persistedApp);
+    return EXIT_SUCCESS;
+  }
+
+  // Parse the number of containers requested by user, e.g.
+  // +5 means add 5 additional containers
+  // -5 means reduce 5 containers, if it goes to negative, sets it to 0
+  // 5 means sets it to 5 containers.
+  private long parseNumberOfContainers(Component component, String newNumber) {
+
+    long orig = component.getNumberOfContainers();
+    if (newNumber.startsWith("+")) {
+      return orig + Long.parseLong(newNumber.substring(1));
+    } else if (newNumber.startsWith("-")) {
+      long ret = orig - Long.parseLong(newNumber.substring(1));
+      if (ret < 0) {
+        LOG.warn(MessageFormat.format(
+            "[COMPONENT {}]: component count goes to negative ({}{} = {}), reset it to 0.",
+            component.getName(), orig, newNumber, ret));
+        ret = 0;
+      }
+      return ret;
+    } else {
+      return Long.parseLong(newNumber);
+    }
+  }
+
+  // Called by Rest Service
+  public Map<String, Long> flexByRestService(String appName,
+      Map<String, Long> componentCounts) throws YarnException, IOException {
+    // load app definition
+    Application persistedApp = ServiceApiUtil.loadApplication(fs, appName);
+    cachedAppIds.put(persistedApp.getName(),
+        ApplicationId.fromString(persistedApp.getId()));
+    return flexComponents(appName, componentCounts, persistedApp);
+  }
+
+  private Map<String, Long> flexComponents(String appName,
+      Map<String, Long> componentCounts, Application persistedApp)
+      throws YarnException, IOException {
+    validateClusterName(appName);
+
+    Map<String, Long> original = new HashMap<>(componentCounts.size());
+
+    ComponentCountProto.Builder countBuilder = ComponentCountProto.newBuilder();
+    FlexComponentsRequestProto.Builder requestBuilder =
+        FlexComponentsRequestProto.newBuilder();
+
+    for (Component persistedComp : persistedApp.getComponents()) {
+      String name = persistedComp.getName();
+      if (componentCounts.containsKey(persistedComp.getName())) {
+        original.put(name, persistedComp.getNumberOfContainers());
+        persistedComp.setNumberOfContainers(componentCounts.get(name));
+
+        // build the request
+        countBuilder.setName(persistedComp.getName())
+            .setNumberOfContainers(persistedComp.getNumberOfContainers());
+        requestBuilder.addComponents(countBuilder.build());
+      }
+    }
+    if (original.size() < componentCounts.size()) {
+      componentCounts.keySet().removeAll(original.keySet());
+      throw new YarnException("Components " + componentCounts.keySet()
+          + " do not exist in app definition.");
+    }
+    ServiceApiUtil.jsonSerDeser
+        .save(fs.getFileSystem(), ServiceApiUtil.getAppJsonPath(fs, appName),
+            persistedApp, true);
+    ClientAMProtocol proxy = connectToAM(appName);
+    proxy.flexComponents(requestBuilder.build());
+    for (Map.Entry<String, Long> entry : original.entrySet()) {
+      LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
+          entry.getKey(), entry.getValue(),
+          componentCounts.get(entry.getKey()));
+    }
+    return original;
+  }
+
+  public int actionStop(String appName) throws YarnException, IOException {
+    validateClusterName(appName);
+    getAppIdFromPersistedApp(appName);
+    ApplicationId currentAppId = cachedAppIds.get(appName);
+    ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
+    if (terminatedStates.contains(report.getYarnApplicationState())) {
+      LOG.info("Application {} is already in a terminated state {}", appName,
+          report.getYarnApplicationState());
+      return EXIT_SUCCESS;
+    }
+    LOG.info("Stopping application {}, with appId = {}", appName, currentAppId);
+    try {
+      // try to stop the app gracefully.
+      ClientAMProtocol proxy = connectToAM(appName);
+      StopRequestProto request = StopRequestProto.newBuilder().build();
+      proxy.stop(request);
+      LOG.info("Application " + appName + " is being gracefully stopped...");
+
+      // Wait until the app is killed.
+      long startTime = System.currentTimeMillis();
+      int pollCount = 0;
+      while (true) {
+        Thread.sleep(1000);
+        report = yarnClient.getApplicationReport(currentAppId);
+        if (terminatedStates.contains(report.getYarnApplicationState())) {
+          LOG.info("Application " + appName + " is stopped.");
+          break;
+        }
+        // Forcefully kill after 10 seconds.
+        if ((System.currentTimeMillis() - startTime) > 10000) {
+          LOG.info("Stop operation timeout stopping, forcefully kill the app "
+              + appName);
+          yarnClient.killApplication(currentAppId,
+              "Forcefully kill the app by user");
+          break;
+        }
+        if (++pollCount % 10 == 0) {
+          LOG.info("Waiting for application " + appName + " to be stopped.");
+        }
+      }
+    } catch (IOException | YarnException | InterruptedException e) {
+      LOG.info("Failed to stop " + appName
+          + " gracefully, forcefully kill the app.");
+      yarnClient.killApplication(currentAppId, "Forcefully kill the app");
+    }
+    return EXIT_SUCCESS;
+  }
+
+  public int actionDestroy(String appName) throws Exception {
+    validateClusterName(appName);
+    verifyNoLiveAppInRM(appName, "Destroy");
+    Path appDir = fs.buildClusterDirPath(appName);
+    FileSystem fileSystem = fs.getFileSystem();
+    // remove from the appId cache
+    cachedAppIds.remove(appName);
+    if (fileSystem.exists(appDir)) {
+      if (fileSystem.delete(appDir, true)) {
+        LOG.info("Successfully deleted application dir for " + appName + ": "
+            + appDir);
+      } else {
+        String message =
+            "Failed to delete application + " + appName + " at:  " + appDir;
+        LOG.info(message);
+        throw new YarnException(message);
+      }
+    }
+    deleteZKNode(appName);
+    String registryPath = SliderRegistryUtils.registryPathForInstance(appName);
+    try {
+      getRegistryClient().delete(registryPath, true);
+    } catch (IOException e) {
+      LOG.warn("Error deleting registry entry {}", registryPath, e);
+    }
+    LOG.info("Destroyed cluster {}", appName);
+    return EXIT_SUCCESS;
+  }
+
+  private synchronized RegistryOperations getRegistryClient()
+      throws SliderException, IOException {
+
+    if (registryClient == null) {
+      registryClient =
+          RegistryOperationsFactory.createInstance("ServiceClient", getConfig());
+      registryClient.init(getConfig());
+      registryClient.start();
+    }
+    return registryClient;
+  }
+
+  private void deleteZKNode(String clusterName) throws Exception {
+    CuratorFramework curatorFramework = getCuratorClient();
+    String user = RegistryUtils.currentUser();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    if (curatorFramework.checkExists().forPath(zkPath) != null) {
+      curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath);
+      LOG.info("Deleted zookeeper path: " + zkPath);
+    }
+  }
+
+  private synchronized CuratorFramework getCuratorClient()
+      throws BadConfigException {
+    String registryQuorum =
+        getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+
+    // though if neither is set: trouble
+    if (SliderUtils.isUnset(registryQuorum)) {
+      throw new BadConfigException(
+          "No Zookeeper quorum provided in the" + " configuration property "
+              + RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+    }
+    ZookeeperUtils.splitToHostsAndPortsStrictly(registryQuorum);
+
+    if (curatorClient == null) {
+      curatorClient =
+          CuratorFrameworkFactory.builder().connectString(registryQuorum)
+              .sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(10, 2000))
+              .build();
+      curatorClient.start();
+    }
+    return curatorClient;
+  }
+
+  private int actionHelp(String actionName, CommonArgs args)
+      throws YarnException, IOException {
+    throw new UsageException(CommonArgs.usage(args, actionName));
+  }
+
+  private void verifyNoLiveAppInRM(String appname, String action)
+      throws IOException, YarnException {
+    Set<String> types = new HashSet<>(1);
+    types.add(SliderKeys.APP_TYPE);
+    Set<String> tags = null;
+    if (appname != null) {
+      tags = Collections.singleton(SliderUtils.createNameTag(appname));
+    }
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance();
+    request.setApplicationTypes(types);
+    request.setApplicationTags(tags);
+    request.setApplicationStates(liveStates);
+    List<ApplicationReport> reports = yarnClient.getApplications(request);
+    if (!reports.isEmpty()) {
+      throw new YarnException(
+          "Failed to " + action + " application, as " + appname
+              + " already exists.");
+    }
+  }
+
+  private ApplicationId submitApp(Application app)
+      throws IOException, YarnException {
+    String appName = app.getName();
+    Configuration conf = getConfig();
+    Path appRootDir = fs.buildClusterDirPath(app.getName());
+
+    YarnClientApplication yarnApp = yarnClient.createApplication();
+    ApplicationSubmissionContext submissionContext =
+        yarnApp.getApplicationSubmissionContext();
+    ServiceApiUtil.validateCompResourceSize(
+        yarnApp.getNewApplicationResponse().getMaximumResourceCapability(),
+        app);
+
+    submissionContext.setKeepContainersAcrossApplicationAttempts(true);
+    if (app.getLifetime() > 0) {
+      Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
+      appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime());
+      submissionContext.setApplicationTimeouts(appTimeout);
+    }
+    submissionContext.setMaxAppAttempts(conf.getInt(KEY_AM_RESTART_LIMIT, 2));
+
+    Map<String, LocalResource> localResources = new HashMap<>();
+
+    // copy local slideram-log4j.properties to hdfs and add to localResources
+    boolean hasSliderAMLog4j =
+        addAMLog4jResource(appName, conf, localResources);
+    // copy jars to hdfs and add to localResources
+    addJarResource(appName, localResources);
+    // add keytab if in secure env
+    addKeytabResourceIfSecure(fs, localResources, conf, appName);
+    if (LOG.isDebugEnabled()) {
+      printLocalResources(localResources);
+    }
+    Map<String, String> env = addAMEnv(conf);
+
+    // create AM CLI
+    String cmdStr =
+        buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j);
+
+    submissionContext.setResource(Resource.newInstance(
+        conf.getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM), 1));
+    submissionContext.setQueue(conf.get(KEY_YARN_QUEUE, app.getQueue()));
+    submissionContext.setApplicationName(appName);
+    submissionContext.setApplicationType(SliderKeys.APP_TYPE);
+    Set<String> appTags =
+        AbstractClientProvider.createApplicationTags(appName, null, null);
+    if (!appTags.isEmpty()) {
+      submissionContext.setApplicationTags(appTags);
+    }
+    ContainerLaunchContext amLaunchContext =
+        Records.newRecord(ContainerLaunchContext.class);
+    amLaunchContext.setCommands(Collections.singletonList(cmdStr));
+    amLaunchContext.setEnvironment(env);
+    amLaunchContext.setLocalResources(localResources);
+    submissionContext.setAMContainerSpec(amLaunchContext);
+    yarnClient.submitApplication(submissionContext);
+    return submissionContext.getApplicationId();
+  }
+
+  private void printLocalResources(Map<String, LocalResource> map) {
+    LOG.debug("Added LocalResource for localization: ");
+    StringBuilder builder = new StringBuilder();
+    for (Map.Entry<String, LocalResource> entry : map.entrySet()) {
+      builder.append(entry.getKey()).append(" -> ")
+          .append(entry.getValue().getResource().getFile())
+          .append(System.lineSeparator());
+    }
+    LOG.debug(builder.toString());
+  }
+
+  private String buildCommandLine(String appName, Configuration conf,
+      Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException {
+    JavaCommandLineBuilder CLI = new JavaCommandLineBuilder();
+    CLI.forceIPv4().headless();
+    //TODO CLI.setJVMHeap
+    //TODO CLI.addJVMOPTS
+    if (hasSliderAMLog4j) {
+      CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
+      CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    }
+    CLI.add(ServiceMaster.class.getCanonicalName());
+    CLI.add(ACTION_CREATE, appName);
+    //TODO debugAM CLI.add(Arguments.ARG_DEBUG)
+    CLI.add(Arguments.ARG_CLUSTER_URI, new Path(appRootDir, appName + ".json"));
+    // pass the registry binding
+    CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+        RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+    CLI.addMandatoryConfOption(conf, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+
+    // write out the path output
+    CLI.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+    String cmdStr = CLI.build();
+    LOG.info("AM launch command: {}", cmdStr);
+    return cmdStr;
+  }
+
+  private Map<String, String> addAMEnv(Configuration conf) throws IOException {
+    Map<String, String> env = new HashMap<>();
+    ClasspathConstructor classpath =
+        buildClasspath(SliderKeys.SUBMITTED_CONF_DIR, "lib", fs, getConfig()
+            .getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false));
+    env.put("CLASSPATH", classpath.buildClasspath());
+    env.put("LANG", "en_US.UTF-8");
+    env.put("LC_ALL", "en_US.UTF-8");
+    env.put("LANGUAGE", "en_US.UTF-8");
+    String jaas = System.getenv(HADOOP_JAAS_DEBUG);
+    if (jaas != null) {
+      env.put(HADOOP_JAAS_DEBUG, jaas);
+    }
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      String userName = UserGroupInformation.getCurrentUser().getUserName();
+      LOG.info("Run as user " + userName);
+      // HADOOP_USER_NAME env is used by UserGroupInformation when log in
+      // This env makes AM run as this user
+      env.put("HADOOP_USER_NAME", userName);
+    }
+    LOG.info("AM env: \n{}", stringifyMap(env));
+    return env;
+  }
+
+  protected Path addJarResource(String appName,
+      Map<String, LocalResource> localResources)
+      throws IOException, SliderException {
+    Path libPath = fs.buildClusterDirPath(appName);
+    ProviderUtils
+        .addProviderJar(localResources, ServiceMaster.class, SLIDER_JAR, fs,
+            libPath, "lib", false);
+    Path dependencyLibTarGzip = fs.getDependencyTarGzip();
+    if (fs.isFile(dependencyLibTarGzip)) {
+      LOG.info("Loading lib tar from " + fs.getFileSystem().getScheme() + ": "
+          + dependencyLibTarGzip);
+      SliderUtils.putAmTarGzipAndUpdate(localResources, fs);
+    } else {
+      String[] libs = SliderUtils.getLibDirs();
+      for (String libDirProp : libs) {
+        ProviderUtils.addAllDependencyJars(localResources, fs, libPath, "lib",
+            libDirProp);
+      }
+    }
+    return libPath;
+  }
+
+  private boolean addAMLog4jResource(String appName, Configuration conf,
+      Map<String, LocalResource> localResources)
+      throws IOException, BadClusterStateException {
+    boolean hasSliderAMLog4j = false;
+    String hadoopConfDir =
+        System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
+    if (hadoopConfDir != null) {
+      File localFile =
+          new File(hadoopConfDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+      if (localFile.exists()) {
+        Path localFilePath = createLocalPath(localFile);
+        Path appDirPath = fs.buildClusterDirPath(appName);
+        Path remoteConfPath =
+            new Path(appDirPath, SliderKeys.SUBMITTED_CONF_DIR);
+        Path remoteFilePath =
+            new Path(remoteConfPath, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
+        copy(conf, localFilePath, remoteFilePath);
+        LocalResource localResource =
+            fs.createAmResource(remoteConfPath, LocalResourceType.FILE);
+        localResources.put(localFilePath.getName(), localResource);
+        hasSliderAMLog4j = true;
+      }
+    }
+    return hasSliderAMLog4j;
+  }
+
+  public int actionStart(String appName) throws YarnException, IOException {
+    validateClusterName(appName);
+    Path appDir = checkAppExistOnHdfs(appName);
+    Application application = ServiceApiUtil.loadApplication(fs, appName);
+    ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
+    // see if it is actually running and bail out;
+    verifyNoLiveAppInRM(appName, "thaw");
+    ApplicationId appId = submitApp(application);
+    application.setId(appId.toString());
+    // write app definition on to hdfs
+    createDirAndPersistApp(appDir, application);
+    return 0;
+  }
+
+  private Path checkAppNotExistOnHdfs(Application application)
+      throws IOException, SliderException {
+    Path appDir = fs.buildClusterDirPath(application.getName());
+    fs.verifyDirectoryNonexistent(
+        new Path(appDir, application.getName() + ".json"));
+    return appDir;
+  }
+
+  private Path checkAppExistOnHdfs(String appName)
+      throws IOException, SliderException {
+    Path appDir = fs.buildClusterDirPath(appName);
+    fs.verifyPathExists(new Path(appDir, appName + ".json"));
+    return appDir;
+  }
+
+  private void createDirAndPersistApp(Path appDir, Application application)
+      throws IOException, SliderException {
+    FsPermission appDirPermission = new FsPermission("750");
+    fs.createWithPermissions(appDir, appDirPermission);
+    persistAppDef(appDir, application);
+  }
+
+  private void persistAppDef(Path appDir, Application application)
+      throws IOException {
+    Path appJson = new Path(appDir, application.getName() + ".json");
+    ServiceApiUtil.jsonSerDeser
+        .save(fs.getFileSystem(), appJson, application, true);
+    LOG.info(
+        "Persisted application " + application.getName() + " at " + appJson);
+  }
+
+  private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
+      Map<String, LocalResource> localResource, Configuration conf,
+      String appName) throws IOException, BadConfigException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    String keytabPreInstalledOnHost =
+        conf.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+    if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
+      String amKeytabName =
+          conf.get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+      String keytabDir = conf.get(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
+      Path keytabPath =
+          fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName);
+      if (fileSystem.getFileSystem().exists(keytabPath)) {
+        LocalResource keytabRes =
+            fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
+        localResource
+            .put(SliderKeys.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
+        LOG.info("Adding AM keytab on hdfs: " + keytabPath);
+      } else {
+        LOG.warn("No keytab file was found at {}.", keytabPath);
+        if (conf.getBoolean(KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
+          throw new BadConfigException("No keytab file was found at %s.",
+              keytabPath);
+        } else {
+          LOG.warn("The AM will be "
+              + "started without a kerberos authenticated identity. "
+              + "The application is therefore not guaranteed to remain "
+              + "operational beyond 24 hours.");
+        }
+      }
+    }
+  }
+
+  public String updateLifetime(String appName, long lifetime)
+      throws YarnException, IOException {
+    getAppIdFromPersistedApp(appName);
+    ApplicationId currentAppId = cachedAppIds.get(appName);
+    ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
+    if (report == null) {
+      throw new YarnException("Application not found for " + appName);
+    }
+    ApplicationId appId = report.getApplicationId();
+    LOG.info("Updating lifetime of an application: appName = " + appName
+        + ", appId = " + appId + ", lifetime = " + lifetime);
+    Map<ApplicationTimeoutType, String> map = new HashMap<>();
+    String newTimeout =
+        Times.formatISO8601(System.currentTimeMillis() + lifetime * 1000);
+    map.put(ApplicationTimeoutType.LIFETIME, newTimeout);
+    UpdateApplicationTimeoutsRequest request =
+        UpdateApplicationTimeoutsRequest.newInstance(appId, map);
+    yarnClient.updateApplicationTimeouts(request);
+    LOG.info(
+        "Successfully updated lifetime for an application: appName = " + appName
+            + ", appId = " + appId + ". New expiry time in ISO8601 format is "
+            + newTimeout);
+    return newTimeout;
+  }
+
+  public Application getStatus(String appName)
+      throws IOException, YarnException {
+    ClientAMProtocol proxy = connectToAM(appName);
+    GetStatusResponseProto response =
+        proxy.getStatus(GetStatusRequestProto.newBuilder().build());
+    return ServiceApiUtil.jsonSerDeser.fromJson(response.getStatus());
+
+  }
+
+  public YarnClient getYarnClient() {
+    return this.yarnClient;
+  }
+
+  public int actionDependency(ActionDependencyArgs args)
+      throws IOException, YarnException {
+    String currentUser = RegistryUtils.currentUser();
+    LOG.info("Running command as user {}", currentUser);
+
+    Path dependencyLibTarGzip = fs.getDependencyTarGzip();
+
+    // Check if dependency has already been uploaded, in which case log
+    // appropriately and exit success (unless overwrite has been requested)
+    if (fs.isFile(dependencyLibTarGzip) && !args.overwrite) {
+      System.out.println(String.format(
+          "Dependency libs are already uploaded to %s. Use %s "
+              + "if you want to re-upload", dependencyLibTarGzip.toUri(),
+          Arguments.ARG_OVERWRITE));
+      return EXIT_SUCCESS;
+    }
+
+    String[] libDirs = SliderUtils.getLibDirs();
+    if (libDirs.length > 0) {
+      File tempLibTarGzipFile = File.createTempFile(
+          SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME + "_",
+          SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT);
+      // copy all jars
+      tarGzipFolder(libDirs, tempLibTarGzipFile, createJarFilter());
+
+      LOG.info("Uploading dependency for AM (version {}) from {} to {}",
+          VersionInfo.getBuildVersion(), tempLibTarGzipFile.toURI(),
+          dependencyLibTarGzip.toUri());
+      fs.copyLocalFileToHdfs(tempLibTarGzipFile, dependencyLibTarGzip,
+          new FsPermission(SliderKeys.SLIDER_DEPENDENCY_DIR_PERMISSIONS));
+      return EXIT_SUCCESS;
+    } else {
+      return EXIT_FALSE;
+    }
+  }
+
+  protected ClientAMProtocol connectToAM(String appName)
+      throws IOException, YarnException {
+    ApplicationId currentAppId = getAppIdFromPersistedApp(appName);
+    // Wait until app becomes running.
+    long startTime = System.currentTimeMillis();
+    int pollCount = 0;
+    ApplicationReport appReport = null;
+    while (true) {
+      appReport = yarnClient.getApplicationReport(currentAppId);
+      YarnApplicationState state = appReport.getYarnApplicationState();
+      if (state == RUNNING) {
+        break;
+      }
+      if (terminatedStates.contains(state)) {
+        throw new YarnException(
+            "Failed to getStatus " + currentAppId + ": " + appReport
+                .getDiagnostics());
+      }
+      long elapsedMillis = System.currentTimeMillis() - startTime;
+      // if over 5 min, quit
+      if (elapsedMillis >= 300000) {
+        throw new YarnException(
+            "Timed out while waiting for application " + currentAppId
+                + " to be running");
+      }
+
+      if (++pollCount % 10 == 0) {
+        LOG.info(
+            "Waiting for application {} to be running, current state is {}",
+            currentAppId, state);
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException ie) {
+        String msg =
+            "Interrupted while waiting for application " + currentAppId
+                + " to be running.";
+        throw new YarnException(msg, ie);
+      }
+    }
+
+    // Make the connection
+    InetSocketAddress address = NetUtils
+        .createSocketAddrForHost(appReport.getHost(), appReport.getRpcPort());
+    return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
+        UserGroupInformation.getCurrentUser(), rpc, address);
+  }
+
+  private synchronized ApplicationId getAppIdFromPersistedApp(String appName)
+      throws IOException, YarnException {
+    if (cachedAppIds.containsKey(appName)) {
+      return cachedAppIds.get(appName);
+    }
+    Application persistedApp = ServiceApiUtil.loadApplication(fs, appName);
+    if (persistedApp == null) {
+      throw new YarnException("Application " + appName
+          + " doesn't exist on hdfs. Please check if the app exists in RM");
+    }
+    ApplicationId currentAppId = ApplicationId.fromString(persistedApp.getId());
+    cachedAppIds.put(appName, currentAppId);
+    return currentAppId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/AbstractActionArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/AbstractActionArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/AbstractActionArgs.java
new file mode 100644
index 0000000..05c6501
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/AbstractActionArgs.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client.params;
+
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.slider.common.params.PathArgumentConverter;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.UsageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base args for all actions
+ */
+public abstract class AbstractActionArgs extends ArgOps implements Arguments {
+  protected static final Logger log =
+    LoggerFactory.getLogger(AbstractActionArgs.class);
+
+
+  protected AbstractActionArgs() {
+  }
+
+  /**
+   * URI/binding to the filesystem
+   */
+  @Parameter(names = {ARG_FILESYSTEM, ARG_FILESYSTEM_LONG},
+             description = "Filesystem Binding")
+  public String filesystemBinding;
+
+  @Parameter(names = {ARG_BASE_PATH},
+             description = "Slider base path on the filesystem",
+             converter =  PathArgumentConverter.class)
+  public Path basePath;
+
+  /**
+   * This is the default parameter
+   */
+  @Parameter
+  public final List<String> parameters = new ArrayList<>();
+
+  /**
+   * get the name: relies on arg 1 being the cluster name in all operations 
+   * @return the name argument, null if there is none
+   */
+  public String getClusterName() {
+    return (parameters.isEmpty()) ? null : parameters.get(0);
+  }
+
+  /**
+   -D name=value
+
+   Define an configuration option which overrides any options in
+   the configuration XML files of the image or in the image configuration
+   directory. The values will be persisted.
+   Configuration options are only passed to the cluster when creating or reconfiguring a cluster.
+
+   */
+
+  @Parameter(names = ARG_DEFINE, arity = 1, description = "Definitions")
+  public final List<String> definitions = new ArrayList<>();
+
+  /**
+   * System properties
+   */
+  @Parameter(names = {ARG_SYSPROP}, arity = 1,
+             description = "system properties in the form name value" +
+                           " These are set after the JVM is started.")
+  public final List<String> sysprops = new ArrayList<>(0);
+
+
+  @Parameter(names = {ARG_MANAGER_SHORT, ARG_MANAGER},
+             description = "Binding (usually hostname:port) of the YARN resource manager")
+  public String manager;
+
+
+  @Parameter(names = ARG_DEBUG, description = "Debug mode")
+  public boolean debug = false;
+
+  @Parameter(names = {ARG_HELP}, description = "Help", help = true)
+  public boolean help = false;
+
+  /**
+   * Get the min #of params expected
+   * @return the min number of params in the {@link #parameters} field
+   */
+  public int getMinParams() {
+    return 1;
+  }
+
+  /**
+   * Get the name of the action
+   * @return the action name
+   */
+  public abstract String getActionName() ;
+
+  /**
+   * Get the max #of params expected
+   * @return the number of params in the {@link #parameters} field;
+   */
+  public int getMaxParams() {
+    return getMinParams();
+  }
+
+  public void validate() throws BadCommandArgumentsException, UsageException {
+    
+    int minArgs = getMinParams();
+    int actionArgSize = parameters.size();
+    if (minArgs > actionArgSize) {
+      throw new BadCommandArgumentsException(
+        ErrorStrings.ERROR_NOT_ENOUGH_ARGUMENTS + getActionName() +
+        " Expected minimum " + minArgs + " but got " + actionArgSize);
+    }
+    int maxArgs = getMaxParams();
+    if (maxArgs == -1) {
+      maxArgs = minArgs;
+    }
+    if (actionArgSize > maxArgs) {
+      String message = String.format("%s for action %s: limit is %d but saw %d: ",
+                                     ErrorStrings.ERROR_TOO_MANY_ARGUMENTS,
+                                     getActionName(), maxArgs,
+                                     actionArgSize);
+      
+      log.error(message);
+      int index = 1;
+      StringBuilder buf = new StringBuilder(message);
+      for (String actionArg : parameters) {
+        log.error("[{}] \"{}\"", index++, actionArg);
+        buf.append(" \"" + actionArg + "\" ");
+      }
+      throw new BadCommandArgumentsException(buf.toString());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ": " + getActionName();
+  }
+
+  /**
+   * Override point: 
+   * Flag to indicate that core hadoop API services are needed (HDFS, YARN, etc)
+   * —and that validation of the client state should take place.
+   * 
+   * @return a flag to indicate that the core hadoop services will be needed.
+   */
+  public boolean getHadoopServicesRequired() {
+    return true;
+  }
+
+  /**
+   * Flag to disable secure login.
+   * This MUST only be set if the action is bypassing security or setting
+   * it itself
+   * @return true if login at slider client init time is to be skipped
+   */
+  public boolean disableSecureLogin() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionBuildArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionBuildArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionBuildArgs.java
new file mode 100644
index 0000000..28381cf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionBuildArgs.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.client.params;
+
+import com.beust.jcommander.Parameters;
+import org.apache.hadoop.yarn.service.client.params.SliderActions;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+
+@Parameters(commandNames = { SliderActions.ACTION_BUILD},
+            commandDescription = SliderActions.DESCRIBE_ACTION_BUILD)
+
+public class ActionBuildArgs extends AbstractClusterBuildingActionArgs {
+
+  @Override
+  public String getActionName() {
+    return SliderActions.ACTION_BUILD;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionCreateArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionCreateArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionCreateArgs.java
new file mode 100644
index 0000000..35cef5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionCreateArgs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client.params;
+
+import com.beust.jcommander.Parameters;
+import org.apache.hadoop.yarn.service.client.params.SliderActions;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+
+@Parameters(commandNames = { SliderActions.ACTION_CREATE},
+            commandDescription = SliderActions.DESCRIBE_ACTION_CREATE)
+
+public class ActionCreateArgs extends AbstractClusterBuildingActionArgs {
+
+  @Override
+  public String getActionName() {
+    return SliderActions.ACTION_CREATE;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDependencyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDependencyArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDependencyArgs.java
new file mode 100644
index 0000000..b41b2af
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDependencyArgs.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.client.params;
+
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.UsageException;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(commandNames = { SliderActions.ACTION_DEPENDENCY },
+            commandDescription = SliderActions.DESCRIBE_ACTION_DEPENDENCY)
+public class ActionDependencyArgs extends AbstractActionArgs {
+
+  @Override
+  public String getActionName() {
+    return SliderActions.ACTION_DEPENDENCY;
+  }
+
+  @Parameter(names = { ARG_UPLOAD }, 
+             description = "Upload AM and agent libraries to HDFS for this client")
+  public boolean upload;
+
+  @Parameter(names = { ARG_OVERWRITE },
+             description = "Overwrite current uploaded dependency libs")
+  public boolean overwrite = false;
+
+  /**
+   * Get the min #of params expected
+   * 
+   * @return the min number of params in the {@link #parameters} field
+   */
+  public int getMinParams() {
+    return 0;
+  }
+
+  @Override
+  public int getMaxParams() {
+    return 1;
+  }
+
+  @Override
+  public void validate() throws BadCommandArgumentsException, UsageException {
+    super.validate();
+
+    if (!upload) {
+      throw new UsageException("Option " + ARG_UPLOAD + " is mandatory");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDestroyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDestroyArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDestroyArgs.java
new file mode 100644
index 0000000..8c41c04
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionDestroyArgs.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client.params;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(commandNames = { SliderActions.ACTION_DESTROY},
+            commandDescription = SliderActions.DESCRIBE_ACTION_DESTROY)
+
+public class ActionDestroyArgs extends AbstractActionArgs {
+
+  @Override
+  public String getActionName() {
+    return SliderActions.ACTION_DESTROY;
+  }
+
+  @Parameter(names = {ARG_FORCE},
+             description = "force the operation")
+  public boolean force;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionFlexArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionFlexArgs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionFlexArgs.java
new file mode 100644
index 0000000..fcbb803
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/client/params/ActionFlexArgs.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client.params;
+
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.ParametersDelegate;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+
+import java.util.List;
+import java.util.Map;
+
+@Parameters(commandNames = { SliderActions.ACTION_FLEX},
+            commandDescription = SliderActions.DESCRIBE_ACTION_FLEX)
+
+public class ActionFlexArgs extends AbstractActionArgs {
+
+  @Override
+  public String getActionName() {
+    return SliderActions.ACTION_FLEX;
+  }
+
+  @ParametersDelegate
+  public ComponentArgsDelegate componentDelegate = new ComponentArgsDelegate();
+
+  /**
+   * Get the component mapping (may be empty, but never null)
+   * @return mapping
+   * @throws BadCommandArgumentsException parse problem
+   */
+  public Map<String, String> getComponentMap() throws
+      BadCommandArgumentsException {
+    return componentDelegate.getComponentMap();
+  }
+
+  public List<String> getComponentTuples() {
+    return componentDelegate.getComponentTuples();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org