You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/09/13 23:33:51 UTC

[64/82] [abbrv] hadoop git commit: YARN-7091. Rename application to service in yarn-native-services. Contributed by Jian He

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
new file mode 100644
index 0000000..cb7131e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
+import org.apache.hadoop.yarn.service.ContainerFailureTracker;
+import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.ServiceMetrics;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
+import org.apache.hadoop.yarn.service.monitor.probe.Probe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
+import static org.apache.hadoop.yarn.service.component.ComponentState.*;
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD;
+
+public class Component implements EventHandler<ComponentEvent> {
+  private static final Logger LOG = LoggerFactory.getLogger(Component.class);
+
+  private org.apache.hadoop.yarn.service.api.records.Component componentSpec;
+  private long allocateId;
+  private Priority priority;
+  private ServiceMetrics componentMetrics;
+  private ServiceScheduler scheduler;
+  private ServiceContext context;
+  private AMRMClientAsync<ContainerRequest> amrmClient;
+  private AtomicLong instanceIdCounter = new AtomicLong();
+  private Map<ComponentInstanceId, ComponentInstance> compInstances =
+      new ConcurrentHashMap<>();
+  // component instances to be assigned with a container
+  private List<ComponentInstance> pendingInstances = new LinkedList<>();
+  private ContainerFailureTracker failureTracker;
+  private Probe probe;
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  public int maxContainerFailurePerComp;
+  // The number of containers failed since last reset. This excludes preempted,
+  // disk_failed containers etc. This will be reset to 0 periodically.
+  public AtomicInteger currentContainerFailure = new AtomicInteger(0);
+
+  private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
+      stateMachine;
+  private AsyncDispatcher compInstanceDispatcher;
+  private static final StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>
+      stateMachineFactory =
+      new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
+          INIT)
+           // INIT will only got to FLEXING
+          .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
+              FLEX, new FlexComponentTransition())
+
+          // container allocated by RM
+          .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
+              new ContainerAllocatedTransition())
+          // container launched on NM
+          .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
+              CONTAINER_STARTED, new ContainerStartedTransition())
+          // container failed while flexing
+          .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
+              new ContainerCompletedTransition())
+          // Flex while previous flex is still in progress
+          .addTransition(FLEXING, EnumSet.of(FLEXING), FLEX,
+              new FlexComponentTransition())
+
+          // container failed while stable
+          .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
+              new ContainerCompletedTransition())
+          // Ignore surplus container
+          .addTransition(STABLE, STABLE, CONTAINER_ALLOCATED,
+              new ContainerAllocatedTransition())
+          // Flex by user
+          // For flex up, go to FLEXING state
+          // For flex down, go to STABLE state
+          .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
+              FLEX, new FlexComponentTransition())
+          .installTopology();
+
+  public Component(
+      org.apache.hadoop.yarn.service.api.records.Component component,
+      long allocateId, ServiceContext context) {
+    this.allocateId = allocateId;
+    this.priority = Priority.newInstance((int) allocateId);
+    this.componentSpec = component;
+    componentMetrics = ServiceMetrics.register(component.getName(),
+        "Metrics for component " + component.getName());
+    componentMetrics
+        .tag("type", "Metrics type [component or service]", "component");
+    this.scheduler = context.scheduler;
+    this.context = context;
+    amrmClient = scheduler.getAmRMClient();
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
+    this.stateMachine = stateMachineFactory.make(this);
+    compInstanceDispatcher = scheduler.getCompInstanceDispatcher();
+    failureTracker =
+        new ContainerFailureTracker(context, this);
+    probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck());
+    maxContainerFailurePerComp = componentSpec.getConfiguration()
+        .getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10);
+    createNumCompInstances(component.getNumberOfContainers());
+  }
+
+  private void createNumCompInstances(long count) {
+    for (int i = 0; i < count; i++) {
+      createOneCompInstance();
+    }
+  }
+
+  private void createOneCompInstance() {
+    ComponentInstanceId id =
+        new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
+            componentSpec.getName());
+    ComponentInstance instance = new ComponentInstance(this, id);
+    compInstances.put(id, instance);
+    pendingInstances.add(instance);
+  }
+
+  private static class FlexComponentTransition implements
+      MultipleArcTransition<Component, ComponentEvent, ComponentState> {
+    // For flex up, go to FLEXING state
+    // For flex down, go to STABLE state
+    @Override
+    public ComponentState transition(Component component,
+        ComponentEvent event) {
+      component.setDesiredContainers((int)event.getDesired());
+      if (!component.areDependenciesReady()) {
+        LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
+            + " satisfied.", component.getName());
+        return component.getState();
+      }
+      if (component.getState() == INIT) {
+        // This happens on init
+        LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
+            .getDesired() + " instances.");
+        component.requestContainers(event.getDesired());
+        return FLEXING;
+      }
+      long before = component.getComponentSpec().getNumberOfContainers();
+      long delta = event.getDesired() - before;
+      component.getComponentSpec().setNumberOfContainers(event.getDesired());
+      if (delta > 0) {
+        // Scale up
+        LOG.info("[FLEX UP COMPONENT " + component.getName() + "]: scaling up from "
+                + before + " to " + event.getDesired());
+        component.requestContainers(delta);
+        component.createNumCompInstances(delta);
+        return FLEXING;
+      } else if (delta < 0){
+        delta = 0 - delta;
+        // scale down
+        LOG.info("[FLEX DOWN COMPONENT " + component.getName()
+            + "]: scaling down from " + before + " to " + event.getDesired());
+        List<ComponentInstance> list =
+            new ArrayList<>(component.compInstances.values());
+
+        // sort in Most recent -> oldest order, destroy most recent ones.
+        Collections.sort(list, Collections.reverseOrder());
+        for (int i = 0; i < delta; i++) {
+          ComponentInstance instance = list.get(i);
+          // remove the instance
+          component.compInstances.remove(instance.getCompInstanceId());
+          component.pendingInstances.remove(instance);
+          component.componentMetrics.containersFailed.incr();
+          component.componentMetrics.containersRunning.decr();
+          // decrement id counter
+          component.instanceIdCounter.decrementAndGet();
+          instance.destroy();
+        }
+        return STABLE;
+      } else {
+        LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
+            event.getDesired() + " instances, ignoring");
+        return STABLE;
+      }
+    }
+  }
+
+  private static class ContainerAllocatedTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      component.assignContainerToCompInstance(event.getContainer());
+    }
+  }
+
+  private static class ContainerStartedTransition implements
+      MultipleArcTransition<Component,ComponentEvent,ComponentState> {
+
+    @Override public ComponentState transition(Component component,
+        ComponentEvent event) {
+      component.compInstanceDispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(event.getInstance().getContainerId(),
+              START));
+      component.incRunningContainers();
+      return checkIfStable(component);
+    }
+  }
+
+  private static ComponentState checkIfStable(Component component) {
+    // if desired == running
+    if (component.componentMetrics.containersRunning.value() == component
+        .getComponentSpec().getNumberOfContainers()) {
+      return STABLE;
+    } else {
+      return FLEXING;
+    }
+  }
+
+  private static class ContainerCompletedTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      component.updateMetrics(event.getStatus());
+
+      // add back to pending list
+      component.pendingInstances.add(event.getInstance());
+      LOG.info(
+          "[COMPONENT {}]: {} completed, num pending comp instances increased to {}.",
+          component.getName(), event.getStatus().getContainerId(),
+          component.pendingInstances.size());
+      component.compInstanceDispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(event.getStatus().getContainerId(),
+              STOP).setStatus(event.getStatus()));
+    }
+  }
+
+  public ServiceMetrics getCompMetrics () {
+    return componentMetrics;
+  }
+
+  private void assignContainerToCompInstance(Container container) {
+    if (pendingInstances.size() == 0) {
+      LOG.info(
+          "[COMPONENT {}]: No pending component instance left, release surplus container {}",
+          getName(), container.getId());
+      scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
+      componentMetrics.surplusContainers.incr();
+      scheduler.getServiceMetrics().surplusContainers.incr();
+      return;
+    }
+    ComponentInstance instance = pendingInstances.remove(0);
+    LOG.info(
+        "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}",
+        getName(), container.getId(), pendingInstances.size());
+    instance.setContainer(container);
+    scheduler.addLiveCompInstance(container.getId(), instance);
+    LOG.info(
+        "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
+        getName(), container.getId(), instance.getCompInstanceName(),
+        container.getNodeId());
+    scheduler.getContainerLaunchService()
+        .launchCompInstance(scheduler.getApp(), instance, container);
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  public void requestContainers(long count) {
+    Resource resource = Resource
+        .newInstance(componentSpec.getResource().getMemoryMB(),
+            componentSpec.getResource().getCpus());
+
+    for (int i = 0; i < count; i++) {
+      //TODO Once YARN-5468 is done, use that for anti-affinity
+      ContainerRequest request =
+          ContainerRequest.newBuilder().capability(resource).priority(priority)
+              .allocationRequestId(allocateId).relaxLocality(true).build();
+      amrmClient.addContainerRequest(request);
+    }
+  }
+
+  private void setDesiredContainers(int n) {
+    int delta = n - scheduler.getServiceMetrics().containersDesired.value();
+    if (delta > 0) {
+      scheduler.getServiceMetrics().containersDesired.incr(delta);
+    } else {
+      scheduler.getServiceMetrics().containersDesired.decr(delta);
+    }
+    componentMetrics.containersDesired.set(n);
+  }
+
+
+
+  private void updateMetrics(ContainerStatus status) {
+    switch (status.getExitStatus()) {
+    case SUCCESS:
+      componentMetrics.containersSucceeded.incr();
+      scheduler.getServiceMetrics().containersSucceeded.incr();
+      return;
+    case PREEMPTED:
+      componentMetrics.containersPreempted.incr();
+      scheduler.getServiceMetrics().containersPreempted.incr();
+      break;
+    case DISKS_FAILED:
+      componentMetrics.containersDiskFailure.incr();
+      scheduler.getServiceMetrics().containersDiskFailure.incr();
+      break;
+    default:
+      break;
+    }
+
+    // containersFailed include preempted, disks_failed etc.
+    componentMetrics.containersFailed.incr();
+    scheduler.getServiceMetrics().containersFailed.incr();
+
+    // dec running container
+    decRunningContainers();
+
+    if (Apps.shouldCountTowardsNodeBlacklisting(status.getExitStatus())) {
+      String host = scheduler.getLiveInstances().get(status.getContainerId())
+          .getNodeId().getHost();
+      failureTracker.incNodeFailure(host);
+      currentContainerFailure.getAndIncrement() ;
+    }
+  }
+
+  public boolean areDependenciesReady() {
+    List<String> dependencies = componentSpec.getDependencies();
+    if (SliderUtils.isEmpty(dependencies)) {
+      return true;
+    }
+    for (String dependency : dependencies) {
+      Component dependentComponent =
+          scheduler.getAllComponents().get(dependency);
+      if (dependentComponent == null) {
+        LOG.error("Couldn't find dependency {} for {} (should never happen)",
+            dependency, getName());
+        continue;
+      }
+      if (dependentComponent.getNumReadyInstances() < dependentComponent
+          .getNumDesiredInstances()) {
+        LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
+                + " instances are ready.", getName(), dependency,
+            dependentComponent.getNumReadyInstances(),
+            dependentComponent.getNumDesiredInstances());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void incRunningContainers() {
+    componentMetrics.containersRunning.incr();
+    scheduler.getServiceMetrics().containersRunning.incr();
+  }
+
+  public void incContainersReady() {
+    componentMetrics.containersReady.incr();
+  }
+
+  public void decContainersReady() {
+    componentMetrics.containersReady.decr();
+  }
+
+  private void decRunningContainers() {
+    componentMetrics.containersRunning.decr();
+    scheduler.getServiceMetrics().containersRunning.decr();
+  }
+
+  public int getNumReadyInstances() {
+    return componentMetrics.containersReady.value();
+  }
+
+  public int getNumRunningInstances() {
+    return componentMetrics.containersRunning.value();
+  }
+
+  public int getNumDesiredInstances() {
+    return componentMetrics.containersDesired.value();
+  }
+
+  public Map<ComponentInstanceId, ComponentInstance> getAllComponentInstances() {
+    return compInstances;
+  }
+
+  public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() {
+    return this.componentSpec;
+  }
+
+  public void resetCompFailureCount() {
+    LOG.info("[COMPONENT {}]: Reset container failure count from {} to 0.",
+        getName(), currentContainerFailure.get());
+    currentContainerFailure.set(0);
+    failureTracker.resetContainerFailures();
+  }
+
+  public Probe getProbe() {
+    return probe;
+  }
+
+  public Priority getPriority() {
+    return priority;
+  }
+
+  public long getAllocateId() {
+    return allocateId;
+  }
+
+  public String getName () {
+    return componentSpec.getName();
+  }
+
+  public ComponentState getState() {
+    this.readLock.lock();
+
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+  public ServiceScheduler getScheduler() {
+    return scheduler;
+  }
+
+  @Override
+  public void handle(ComponentEvent event) {
+    try {
+      writeLock.lock();
+      ComponentState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitionException e) {
+        LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}",
+            componentSpec.getName(), event.getType(), oldState), e);
+      }
+      if (oldState != getState()) {
+        LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.",
+            componentSpec.getName(), oldState, getState(), event.getType());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private static class BaseTransition implements
+      SingleArcTransition<Component, ComponentEvent> {
+
+    @Override public void transition(Component component,
+        ComponentEvent event) {
+    }
+  }
+
+  public ServiceContext getContext() {
+    return context;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
new file mode 100644
index 0000000..d93dcf1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+public class ComponentEvent extends AbstractEvent<ComponentEventType> {
+  private long desired;
+  private final String name;
+  private final ComponentEventType type;
+  private Container container;
+  private ComponentInstance instance;
+  private ContainerStatus status;
+
+  public ComponentEvent(String name, ComponentEventType type) {
+    super(type);
+    this.name = name;
+    this.type = type;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public ComponentEventType getType() {
+    return type;
+  }
+
+  public long getDesired() {
+    return desired;
+  }
+
+  public ComponentEvent setDesired(long desired) {
+    this.desired = desired;
+    return this;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+  public ComponentEvent setContainer(Container container) {
+    this.container = container;
+    return this;
+  }
+
+  public ComponentInstance getInstance() {
+    return instance;
+  }
+
+  public ComponentEvent setInstance(ComponentInstance instance) {
+    this.instance = instance;
+    return this;
+  }
+
+  public ContainerStatus getStatus() {
+    return status;
+  }
+
+  public ComponentEvent setStatus(ContainerStatus status) {
+    this.status = status;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
new file mode 100644
index 0000000..6729699
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+public enum ComponentEventType {
+  FLEX,
+  CONTAINER_ALLOCATED,
+  CONTAINER_STARTED,
+  CONTAINER_COMPLETED
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
new file mode 100644
index 0000000..a5f9ff4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+public enum ComponentState {
+  INIT,
+  FLEXING,
+  STABLE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
new file mode 100644
index 0000000..7d6525b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
+import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
+import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
+
+public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
+    Comparable<ComponentInstance> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ComponentInstance.class);
+
+  private  StateMachine<ComponentInstanceState, ComponentInstanceEventType,
+      ComponentInstanceEvent> stateMachine;
+  private Component component;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+
+  private ComponentInstanceId compInstanceId = null;
+  private Path compInstanceDir;
+  private Container container;
+  private YarnRegistryViewForProviders yarnRegistryOperations;
+  private FileSystem fs;
+  private boolean timelineServiceEnabled = false;
+  private ServiceTimelinePublisher serviceTimelinePublisher;
+  private ServiceScheduler scheduler;
+  private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
+  private volatile ScheduledFuture containerStatusFuture;
+  private volatile ContainerStatus status;
+  private long containerStartedTime = 0;
+  // This container object is used for rest API query
+  private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
+
+  private static final StateMachineFactory<ComponentInstance,
+      ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
+      stateMachineFactory =
+      new StateMachineFactory<ComponentInstance, ComponentInstanceState,
+          ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
+      .addTransition(INIT, STARTED, START,
+          new ContainerStartedTransition())
+
+      //From Running
+      .addTransition(STARTED, INIT, STOP,
+          new ContainerStoppedTransition())
+      .addTransition(STARTED, READY, BECOME_READY,
+          new ContainerBecomeReadyTransition())
+
+      // FROM READY
+      .addTransition(READY, STARTED, BECOME_NOT_READY,
+          new ContainerBecomeNotReadyTransition())
+      .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
+      .installTopology();
+
+
+
+  public ComponentInstance(Component component,
+      ComponentInstanceId compInstanceId) {
+    this.stateMachine = stateMachineFactory.make(this);
+    this.component = component;
+    this.compInstanceId = compInstanceId;
+    this.scheduler = component.getScheduler();
+    this.yarnRegistryOperations =
+        component.getScheduler().getYarnRegistryOperations();
+    this.serviceTimelinePublisher =
+        component.getScheduler().getServiceTimelinePublisher();
+    if (YarnConfiguration
+        .timelineServiceV2Enabled(component.getScheduler().getConfig())) {
+      this.timelineServiceEnabled = true;
+    }
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
+    this.fs = scheduler.getContext().fs.getFileSystem();
+  }
+
+  private static class ContainerStartedTransition extends  BaseTransition {
+    @Override public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      // Query container status for ip and host
+      compInstance.containerStatusFuture =
+          compInstance.scheduler.executorService.scheduleAtFixedRate(
+              new ContainerStatusRetriever(compInstance.scheduler,
+                  compInstance.getContainerId(), compInstance), 0, 1,
+              TimeUnit.SECONDS);
+
+      org.apache.hadoop.yarn.service.api.records.Container container =
+          new org.apache.hadoop.yarn.service.api.records.Container();
+      container.setId(compInstance.getContainerId().toString());
+      container.setLaunchTime(new Date());
+      container.setState(ContainerState.RUNNING_BUT_UNREADY);
+      container.setBareHost(compInstance.container.getNodeId().getHost());
+      container.setComponentName(compInstance.getCompInstanceName());
+      if (compInstance.containerSpec != null) {
+        // remove the previous container.
+        compInstance.getCompSpec().removeContainer(compInstance.containerSpec);
+      }
+      compInstance.containerSpec = container;
+      compInstance.getCompSpec().addContainer(container);
+      compInstance.containerStartedTime = System.currentTimeMillis();
+
+      if (compInstance.timelineServiceEnabled) {
+        compInstance.serviceTimelinePublisher
+            .componentInstanceStarted(container, compInstance);
+      }
+    }
+  }
+
+  private static class ContainerBecomeReadyTransition extends BaseTransition {
+    @Override
+    public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      compInstance.component.incContainersReady();
+      compInstance.containerSpec.setState(ContainerState.READY);
+    }
+  }
+
+  private static class ContainerBecomeNotReadyTransition extends BaseTransition {
+    @Override
+    public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      compInstance.component.decContainersReady();
+      compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
+    }
+  }
+
+  private static class ContainerStoppedTransition extends  BaseTransition {
+    @Override
+    public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      // re-ask the failed container.
+      Component comp = compInstance.component;
+      comp.requestContainers(1);
+      LOG.info(compInstance.getCompInstanceId()
+              + ": Container completed. Requested a new container." + System
+              .lineSeparator() + " exitStatus={}, diagnostics={}.",
+          event.getStatus().getExitStatus(),
+          event.getStatus().getDiagnostics());
+      String containerDiag =
+          compInstance.getCompInstanceId() + ": " + event.getStatus()
+              .getDiagnostics();
+      compInstance.diagnostics.append(containerDiag + System.lineSeparator());
+
+      boolean shouldExit = false;
+      // check if it exceeds the failure threshold
+      if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) {
+        String exitDiag = MessageFormat.format(
+            "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... "
+                + System.lineSeparator(),
+            comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
+        compInstance.diagnostics.append(exitDiag);
+        // append to global diagnostics that will be reported to RM.
+        comp.getScheduler().getDiagnostics().append(containerDiag);
+        comp.getScheduler().getDiagnostics().append(exitDiag);
+        LOG.warn(exitDiag);
+        shouldExit = true;
+      }
+
+      // clean up registry
+      // hdfs dir content will be overwritten when a new container gets started,
+      // so no need remove.
+      compInstance.scheduler.executorService
+          .submit(compInstance::cleanupRegistry);
+
+      // remove the failed ContainerId -> CompInstance mapping
+      comp.getScheduler().removeLiveCompInstance(event.getContainerId());
+
+      if (compInstance.timelineServiceEnabled) {
+        // record in ATS
+        compInstance.serviceTimelinePublisher
+            .componentInstanceFinished(compInstance,
+                event.getStatus().getExitStatus(), event.getStatus().getState(),
+                containerDiag);
+      }
+
+      compInstance.containerSpec.setState(ContainerState.STOPPED);
+      if (shouldExit) {
+        // Sleep for 5 seconds in hope that the state can be recorded in ATS.
+        // in case there's a client polling the comp state, it can be notified.
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted on sleep while exiting.", e);
+        }
+        ExitUtil.terminate(-1);
+      }
+    }
+  }
+
+  public ComponentInstanceState getState() {
+    this.readLock.lock();
+
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(ComponentInstanceEvent event) {
+    try {
+      writeLock.lock();
+      ComponentInstanceState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitionException e) {
+        LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() +
+            " at " + oldState, e);
+      }
+      if (oldState != getState()) {
+        LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to "
+            + getState() + " on " + event.getType() + " event");
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setContainer(Container container) {
+    this.container = container;
+    this.compInstanceId.setContainerId(container.getId());
+  }
+
+  public String getCompInstanceName() {
+    return compInstanceId.getCompInstanceName();
+  }
+
+  public ContainerStatus getContainerStatus() {
+    return status;
+  }
+
+  public void updateContainerStatus(ContainerStatus status) {
+    this.status = status;
+    org.apache.hadoop.yarn.service.api.records.Container container =
+        getCompSpec().getContainer(getContainerId().toString());
+    if (container != null) {
+      container.setIp(StringUtils.join(",", status.getIPs()));
+      container.setHostname(status.getHost());
+      if (timelineServiceEnabled) {
+        serviceTimelinePublisher.componentInstanceUpdated(container);
+      }
+    }
+    updateServiceRecord(yarnRegistryOperations, status);
+  }
+
+  public ContainerId getContainerId() {
+    return container.getId();
+  }
+
+  public String getCompName() {
+    return compInstanceId.getCompName();
+  }
+
+  public void setCompInstanceDir(Path dir) {
+    this.compInstanceDir = dir;
+  }
+
+  public Component getComponent() {
+    return component;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+  public ComponentInstanceId getCompInstanceId() {
+    return compInstanceId;
+  }
+
+  public NodeId getNodeId() {
+    return this.container.getNodeId();
+  }
+
+  public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
+    return component.getComponentSpec();
+  }
+
+  private static class BaseTransition implements
+      SingleArcTransition<ComponentInstance, ComponentInstanceEvent> {
+
+    @Override public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+    }
+  }
+
+  public ProbeStatus ping() {
+    if (component.getProbe() == null) {
+      ProbeStatus status = new ProbeStatus();
+      status.setSuccess(true);
+      return status;
+    }
+    return component.getProbe().ping(this);
+  }
+
+  // Write service record into registry
+  private  void updateServiceRecord(
+      YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) {
+    ServiceRecord record = new ServiceRecord();
+    String containerId = status.getContainerId().toString();
+    record.set(YarnRegistryAttributes.YARN_ID, containerId);
+    record.description = getCompInstanceName();
+    record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+        PersistencePolicies.CONTAINER);
+    record.set("yarn:ip", status.getIPs());
+    record.set("yarn:hostname", status.getHost());
+    try {
+      yarnRegistry
+          .putComponent(RegistryPathUtils.encodeYarnID(containerId), record);
+    } catch (IOException e) {
+      LOG.error(
+          "Failed to update service record in registry: " + containerId + "");
+    }
+  }
+
+  // Release the container , cleanup registry, hdfs dir, and record in ATS
+  public void destroy() {
+    LOG.info(getCompInstanceId() + ": Flexed down by user, destroying.");
+    diagnostics.append(getCompInstanceId() + ": Flexed down by user");
+    if (container != null) {
+      scheduler.removeLiveCompInstance(container.getId());
+      component.getScheduler().getAmRMClient()
+          .releaseAssignedContainer(container.getId());
+      getCompSpec().removeContainer(containerSpec);
+    }
+    if (timelineServiceEnabled) {
+      serviceTimelinePublisher
+          .componentInstanceFinished(this, KILLED_BY_APPMASTER, COMPLETE,
+              diagnostics.toString());
+    }
+    scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir);
+  }
+
+  private void cleanupRegistry() {
+    ContainerId containerId = getContainerId();
+    String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
+    try {
+       yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid);
+    } catch (IOException e) {
+      LOG.error(getCompInstanceId() + ": Failed to delete registry", e);
+    }
+  }
+
+  //TODO Maybe have a dedicated cleanup service.
+  public void cleanupRegistryAndCompHdfsDir() {
+    cleanupRegistry();
+    try {
+      if (compInstanceDir != null && fs.exists(compInstanceDir)) {
+        boolean deleted = fs.delete(compInstanceDir, true);
+        if (!deleted) {
+          LOG.error(getCompInstanceId()
+              + ": Failed to delete component instance dir: "
+              + compInstanceDir);
+        } else {
+          LOG.info(getCompInstanceId() + ": Deleted component instance dir: "
+              + compInstanceDir);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn(getCompInstanceId() + ": Failed to delete directory", e);
+    }
+  }
+
+  // Query container status until ip and hostname are available and update
+  // the service record into registry service
+  private static class ContainerStatusRetriever implements Runnable {
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private NMClient nmClient;
+    private ComponentInstance instance;
+    ContainerStatusRetriever(ServiceScheduler scheduler,
+        ContainerId containerId, ComponentInstance instance) {
+      this.containerId = containerId;
+      this.nodeId = instance.getNodeId();
+      this.nmClient = scheduler.getNmClient().getClient();
+      this.instance = instance;
+    }
+    @Override public void run() {
+      ContainerStatus status = null;
+      try {
+        status = nmClient.getContainerStatus(containerId, nodeId);
+      } catch (Exception e) {
+        if (e instanceof YarnException) {
+          throw new YarnRuntimeException(
+              instance.compInstanceId + " Failed to get container status on "
+                  + nodeId + " , cancelling.", e);
+        }
+        LOG.error(instance.compInstanceId + " Failed to get container status on "
+            + nodeId + ", will try again", e);
+        return;
+      }
+      if (SliderUtils.isEmpty(status.getIPs()) || SliderUtils
+          .isUnset(status.getHost())) {
+        return;
+      }
+      instance.updateContainerStatus(status);
+      LOG.info(
+          instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
+              + status.getHost() + ", cancel container status retriever");
+      instance.containerStatusFuture.cancel(false);
+    }
+  }
+
+  @Override
+  public int compareTo(ComponentInstance to) {
+    long delta = containerStartedTime - to.containerStartedTime;
+    if (delta == 0) {
+      return getCompInstanceId().compareTo(to.getCompInstanceId());
+    } else if (delta < 0) {
+      return -1;
+    } else {
+      return 1;
+    }
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    ComponentInstance instance = (ComponentInstance) o;
+
+    if (containerStartedTime != instance.containerStartedTime)
+      return false;
+    return compInstanceId.equals(instance.compInstanceId);
+  }
+
+  @Override public int hashCode() {
+    int result = compInstanceId.hashCode();
+    result = 31 * result + (int) (containerStartedTime ^ (containerStartedTime
+        >>> 32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
new file mode 100644
index 0000000..707b034
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ComponentInstanceEvent
+    extends AbstractEvent<ComponentInstanceEventType> {
+
+  private ContainerId id;
+  private ContainerStatus status;
+  private boolean shouldDestroy = false;
+
+  public ComponentInstanceEvent(ContainerId containerId,
+      ComponentInstanceEventType componentInstanceEventType) {
+    super(componentInstanceEventType);
+    this.id = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return id;
+  }
+
+  public ContainerStatus getStatus() {
+    return this.status;
+  }
+
+  public ComponentInstanceEvent setStatus(ContainerStatus status) {
+    this.status = status;
+    return this;
+  }
+
+  public void setShouldDestroy() {
+    shouldDestroy = true;
+  }
+
+  public boolean shouldDestroy() {
+    return shouldDestroy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
new file mode 100644
index 0000000..1a880ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+public enum ComponentInstanceEventType {
+  START,
+  STOP,
+  BECOME_READY,
+  BECOME_NOT_READY
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
new file mode 100644
index 0000000..14387ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ComponentInstanceId implements Comparable<ComponentInstanceId> {
+
+  private long Id;
+  private String name;
+  private ContainerId containerId;
+
+  public ComponentInstanceId(long id, String name) {
+    Id = id;
+    this.name = name;
+  }
+
+  public long getId() {
+    return Id;
+  }
+
+  public String getCompName() {
+    return name;
+  }
+
+  public String getCompInstanceName() {
+    return getCompName() + "-" + getId();
+  }
+
+  public void setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public String toString() {
+    if (containerId == null) {
+      return "[COMPINSTANCE " + getCompInstanceName() + "]";
+    } else {
+      return "[COMPINSTANCE " + getCompInstanceName() + " : " + containerId + "]";
+    }
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    ComponentInstanceId that = (ComponentInstanceId) o;
+
+    if (getId() != that.getId())
+      return false;
+    return getCompName() != null ? getCompName().equals(that.getCompName()) :
+        that.getCompName() == null;
+
+  }
+
+  @Override public int hashCode() {
+    int result = (int) (getId() ^ (getId() >>> 32));
+    result = 31 * result + (getCompName() != null ? getCompName().hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public int compareTo(ComponentInstanceId to) {
+    int delta = this.getCompName().compareTo(to.getCompName());
+    if (delta == 0) {
+      return Long.compare(this.getId(), to.getId());
+    } else if (delta < 0) {
+      return -1;
+    } else {
+      return 1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
new file mode 100644
index 0000000..f5de5cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+public enum ComponentInstanceState {
+  INIT,
+  STARTED,
+  READY,
+  UPGRADING
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
new file mode 100644
index 0000000..6de2dc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+public interface RestApiConstants {
+
+  // Rest endpoints
+  String CONTEXT_ROOT = "/ws/v1";
+  String VERSION = "/services/version";
+  String SERVICE_ROOT_PATH = "/services";
+  String SERVICE_PATH = "/services/{service_name}";
+  String COMPONENT_PATH = "/services/{service_name}/components/{component_name}";
+
+  // Query param
+  String SERVICE_NAME = "service_name";
+  String COMPONENT_NAME = "component_name";
+
+  String DEFAULT_COMPONENT_NAME = "default";
+
+  String PROPERTY_REST_SERVICE_HOST = "REST_SERVICE_HOST";
+  String PROPERTY_REST_SERVICE_PORT = "REST_SERVICE_PORT";
+  Long DEFAULT_UNLIMITED_LIFETIME = -1l;
+
+  Integer ERROR_CODE_APP_DOES_NOT_EXIST = 404001;
+  Integer ERROR_CODE_APP_IS_NOT_RUNNING = 404002;
+  Integer ERROR_CODE_APP_SUBMITTED_BUT_NOT_RUNNING_YET = 404003;
+  Integer ERROR_CODE_APP_NAME_INVALID = 404004;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
new file mode 100644
index 0000000..ee270cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+import org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes;
+
+public interface SliderExitCodes extends LauncherExitCodes {
+
+  /**
+   * starting point for exit codes; not an exception itself
+   */
+  int _EXIT_CODE_BASE =           64;
+
+  /**
+   * service entered the failed state: {@value}
+   */
+  int EXIT_YARN_SERVICE_FAILED =  65;
+
+  /**
+   * service was killed: {@value}
+   */
+  int EXIT_YARN_SERVICE_KILLED =  66;
+
+  /**
+   * timeout on monitoring client: {@value}
+   */
+  int EXIT_TIMED_OUT =            67;
+
+  /**
+   * service finished with an error: {@value}
+   */
+  int EXIT_YARN_SERVICE_FINISHED_WITH_ERROR = 68;
+
+  /**
+   * the service instance is unknown: {@value}
+   */
+  int EXIT_UNKNOWN_INSTANCE =     69;
+
+  /**
+   * the service instance is in the wrong state for that operation: {@value}
+   */
+  int EXIT_BAD_STATE =            70;
+
+  /**
+   * A spawned master process failed 
+   */
+  int EXIT_PROCESS_FAILED =       71;
+
+  /**
+   * The instance failed -too many containers were
+   * failing or some other threshold was reached
+   */
+  int EXIT_DEPLOYMENT_FAILED =    72;
+
+  /**
+   * The service is live -and the requested operation
+   * does not work if the cluster is running
+   */
+  int EXIT_APPLICATION_IN_USE =   73;
+
+  /**
+   * There already is an service instance of that name
+   * when an attempt is made to create a new instance
+   */
+  int EXIT_INSTANCE_EXISTS =      75;
+
+  /**
+   * Exit code when the configurations in valid/incomplete: {@value}
+   */
+  int EXIT_BAD_CONFIGURATION =    77;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
new file mode 100644
index 0000000..1968e95
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+
+public class YarnServiceConf {
+
+  // Retry settings for the ServiceClient to talk to Service AppMaster
+  public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
+  public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
+
+  // Retry settings for container failures
+  public static final String CONTAINER_RETRY_MAX = "yarn.service.container-failure.retry.max";
+  public static final String CONTAINER_RETRY_INTERVAL = "yarn.service.container-failure.retry-interval";
+
+  public static final String AM_RESTART_MAX = "yarn.service.am-restart.max-attempts";
+  public static final String AM_RESOURCE_MEM = "yarn.service.am-resource.memory";
+  public static final long DEFAULT_KEY_AM_RESOURCE_MEM = 1024;
+
+  public static final String YARN_QUEUE = "yarn.service.queue";
+
+  public static final String API_SERVER_ADDRESS = "yarn.service.api-server.address";
+  public static final String DEFAULT_API_SERVER_ADDRESS = "0.0.0.0:";
+  public static final int DEFAULT_API_SERVER_PORT = 9191;
+
+  /**
+   * The yarn service base path:
+   * Defaults to HomeDir/.yarn/
+   */
+  public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path";
+
+  //TODO rename
+  /** Declare that a keytab must be provided */
+  public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required";
+  public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name";
+  public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir";
+  public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path";
+
+  /**
+   * maximum number of failed containers (in a single component)
+   * before the app exits
+   */
+  public static final String CONTAINER_FAILURE_THRESHOLD =
+      "yarn.service.container-failure-per-component.threshold";
+  /**
+   * Maximum number of container failures on a node before the node is blacklisted
+   */
+  public static final String NODE_BLACKLIST_THRESHOLD =
+      "yarn.service.node-blacklist.threshold";
+
+  /**
+   * The failure count for CONTAINER_FAILURE_THRESHOLD and NODE_BLACKLIST_THRESHOLD
+   * gets reset periodically, the unit is seconds.
+   */
+  public static final String CONTAINER_FAILURE_WINDOW =
+      "yarn.service.failure-count-reset.window";
+
+  /**
+   * interval between readiness checks.
+   */
+  public static final String READINESS_CHECK_INTERVAL = "yarn.service.readiness-check-interval.seconds";
+  public static final int DEFAULT_READINESS_CHECK_INTERVAL = 30; // seconds
+
+  /**
+   * Get long value for the property. First get from the userConf, if not
+   * present, get from systemConf.
+   *
+   * @param name name of the property
+   * @param defaultValue default value of the property, if it is not defined in
+   *                     userConf and systemConf.
+   * @param userConf Configuration provided by client in the JSON definition
+   * @param systemConf The YarnConfiguration in the system.
+   * @return long value for the property
+   */
+  public static long getLong(String name, long defaultValue,
+      Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
+    return userConf.getPropertyLong(name, systemConf.getLong(name, defaultValue));
+  }
+
+  public static int getInt(String name, int defaultValue,
+      Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
+    return userConf.getPropertyInt(name, systemConf.getInt(name, defaultValue));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
new file mode 100644
index 0000000..e5ed703
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+public interface YarnServiceConstants {
+
+  /**
+   * The path under which cluster and temp data are stored
+   */
+  String SERVICE_BASE_DIRECTORY = ".yarn";
+
+  /**
+   * The paths under which Service AM dependency libraries are stored
+   */
+  String DEPENDENCY_LOCALIZED_DIR_LINK = "service_dep";
+  String DEPENDENCY_DIR = "/yarn-services/%s/";
+  String DEPENDENCY_TAR_GZ_FILE_NAME = "service-dep";
+  String DEPENDENCY_TAR_GZ_FILE_EXT = ".tar.gz";
+  String DEPENDENCY_DIR_PERMISSIONS = "755";
+
+  /**
+   * Service type for YARN service
+   */
+  String APP_TYPE = "yarn-service";
+
+  String KEYTAB_DIR = "keytabs";
+  String RESOURCE_DIR = "resources";
+
+
+  String SERVICES_DIRECTORY = "services";
+
+  /**
+   * JVM property to define the service lib directory;
+   * this is set by the yarn.sh script
+   */
+  String PROPERTY_LIB_DIR = "service.libdir";
+
+  /**
+   * name of generated dir for this conf
+   */
+  String SUBMITTED_CONF_DIR = "conf";
+
+  /**
+   * Service AM log4j file name
+   */
+  String YARN_SERVICE_LOG4J_FILENAME = "yarnservice-log4j.properties";
+
+  /**
+   * Log4j sysprop to name the resource
+   */
+  String SYSPROP_LOG4J_CONFIGURATION = "log4j.configuration";
+
+  /**
+   * sysprop for Service AM log4j directory
+   */
+  String SYSPROP_LOG_DIR = "LOG_DIR";
+
+  String TMP_DIR_PREFIX = "tmp";
+
+
+  String SERVICE_CORE_JAR = "yarn-service-core.jar";
+
+  String STDOUT_AM = "serviceam-out.txt";
+  String STDERR_AM = "serviceam-err.txt";
+
+  String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+
+  String APP_CONF_DIR = "conf";
+
+  String APP_LIB_DIR = "lib";
+
+  String OUT_FILE = "stdout.txt";
+  String ERR_FILE = "stderr.txt";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
new file mode 100644
index 0000000..e4eae20
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.containerlaunch;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.apache.hadoop.yarn.service.provider.docker.DockerKeys.DEFAULT_DOCKER_NETWORK;
+
+/**
+ * Launcher of applications: base class
+ */
+public class AbstractLauncher {
+  private static final Logger log =
+    LoggerFactory.getLogger(AbstractLauncher.class);
+  public static final String CLASSPATH = "CLASSPATH";
+  /**
+   * Filesystem to use for the launch
+   */
+  protected final CoreFileSystem coreFileSystem;
+  /**
+   * Env vars; set up at final launch stage
+   */
+  protected final Map<String, String> envVars = new HashMap<>();
+  protected final ContainerLaunchContext containerLaunchContext =
+    Records.newRecord(ContainerLaunchContext.class);
+  protected final List<String> commands = new ArrayList<>(20);
+  protected final Map<String, LocalResource> localResources = new HashMap<>();
+  protected final Map<String, String> mountPaths = new HashMap<>();
+  private final Map<String, ByteBuffer> serviceData = new HashMap<>();
+  // security
+  protected final Credentials credentials;
+  protected boolean yarnDockerMode = false;
+  protected String dockerImage;
+  protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
+  protected String dockerHostname;
+  protected String runPrivilegedContainer;
+
+
+  /**
+   * Create instance.
+   * @param coreFileSystem filesystem
+   * @param credentials initial set of credentials -null is permitted
+   */
+  public AbstractLauncher(
+      CoreFileSystem coreFileSystem,
+      Credentials credentials) {
+    this.coreFileSystem = coreFileSystem;
+    this.credentials = credentials != null ? credentials: new Credentials();
+  }
+  
+  public void setYarnDockerMode(boolean yarnDockerMode){
+    this.yarnDockerMode = yarnDockerMode;
+  }
+
+  /**
+   * Get the env vars to work on
+   * @return env vars
+   */
+  public Map<String, String> getEnv() {
+    return envVars;
+  }
+
+  /**
+   * Get the launch commands.
+   * @return the live list of commands 
+   */
+  public List<String> getCommands() {
+    return commands;
+  }
+
+  public void addLocalResource(String subPath, LocalResource resource) {
+    localResources.put(subPath, resource);
+  }
+
+  public void addLocalResource(String subPath, LocalResource resource, String mountPath) {
+    localResources.put(subPath, resource);
+    mountPaths.put(subPath, mountPath);
+  }
+
+  /**
+   * Accessor to the credentials
+   * @return the credentials associated with this launcher
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+
+  public void addCommand(String cmd) {
+    commands.add(cmd);
+  }
+
+  /**
+   * Complete the launch context (copy in env vars, etc).
+   * @return the container to launch
+   */
+  public ContainerLaunchContext completeContainerLaunch() throws IOException {
+    
+    String cmdStr = SliderUtils.join(commands, " ", false);
+    log.debug("Completed setting up container command {}", cmdStr);
+    containerLaunchContext.setCommands(commands);
+
+    //env variables
+    if (log.isDebugEnabled()) {
+      log.debug("Environment variables");
+      for (Map.Entry<String, String> envPair : envVars.entrySet()) {
+        log.debug("    \"{}\"=\"{}\"", envPair.getKey(), envPair.getValue());
+      }
+    }    
+    containerLaunchContext.setEnvironment(envVars);
+
+    //service data
+    if (log.isDebugEnabled()) {
+      log.debug("Service Data size");
+      for (Map.Entry<String, ByteBuffer> entry : serviceData.entrySet()) {
+        log.debug("\"{}\"=> {} bytes of data", entry.getKey(),
+            entry.getValue().array().length);
+      }
+    }
+    containerLaunchContext.setServiceData(serviceData);
+
+    // resources
+    dumpLocalResources();
+    containerLaunchContext.setLocalResources(localResources);
+
+    //tokens
+    log.debug("{} tokens", credentials.numberOfTokens());
+    containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
+        credentials));
+
+    if(yarnDockerMode){
+      Map<String, String> env = containerLaunchContext.getEnvironment();
+      env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker");
+      env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);
+      env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork);
+      env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_HOSTNAME",
+          dockerHostname);
+      env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer);
+      StringBuilder sb = new StringBuilder();
+      for (Entry<String,String> mount : mountPaths.entrySet()) {
+        if (sb.length() > 0) {
+          sb.append(",");
+        }
+        sb.append(mount.getKey());
+        sb.append(":");
+        sb.append(mount.getValue());
+      }
+      env.put("YARN_CONTAINER_RUNTIME_DOCKER_LOCAL_RESOURCE_MOUNTS", sb.toString());
+      log.info("yarn docker env var has been set {}", containerLaunchContext.getEnvironment().toString());
+    }
+
+    return containerLaunchContext;
+  }
+
+  public void setRetryContext(int maxRetries, int retryInterval) {
+    ContainerRetryContext retryContext = ContainerRetryContext
+        .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, maxRetries,
+            retryInterval);
+    containerLaunchContext.setContainerRetryContext(retryContext);
+  }
+
+  /**
+   * Dump local resources at debug level
+   */
+  private void dumpLocalResources() {
+    if (log.isDebugEnabled()) {
+      log.debug("{} resources: ", localResources.size());
+      for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+
+        String key = entry.getKey();
+        LocalResource val = entry.getValue();
+        log.debug(key + "=" + SliderUtils.stringify(val.getResource()));
+      }
+    }
+  }
+
+  /**
+   * This is critical for an insecure cluster -it passes
+   * down the username to YARN, and so gives the code running
+   * in containers the rights it needs to work with
+   * data.
+   * @throws IOException problems working with current user
+   */
+  protected void propagateUsernameInInsecureCluster() throws IOException {
+    //insecure cluster: propagate user name via env variable
+    String userName = UserGroupInformation.getCurrentUser().getUserName();
+    envVars.put(YarnServiceConstants.HADOOP_USER_NAME, userName);
+  }
+
+  /**
+   * Utility method to set up the classpath
+   * @param classpath classpath to use
+   */
+  public void setClasspath(ClasspathConstructor classpath) {
+    setEnv(CLASSPATH, classpath.buildClasspath());
+  }
+
+  /**
+   * Set an environment variable in the launch context
+   * @param var variable name
+   * @param value value (must be non null)
+   */
+  public void setEnv(String var, String value) {
+    Preconditions.checkArgument(var != null, "null variable name");
+    Preconditions.checkArgument(value != null, "null value");
+    envVars.put(var, value);
+  }
+
+
+  public void putEnv(Map<String, String> map) {
+    envVars.putAll(map);
+  }
+
+
+  public void setDockerImage(String dockerImage) {
+    this.dockerImage = dockerImage;
+  }
+
+  public void setDockerNetwork(String dockerNetwork) {
+    this.dockerNetwork = dockerNetwork;
+  }
+
+  public void setDockerHostname(String dockerHostname) {
+    this.dockerHostname = dockerHostname;
+  }
+
+  public void setRunPrivilegedContainer(boolean runPrivilegedContainer) {
+    if (runPrivilegedContainer) {
+      this.runPrivilegedContainer = Boolean.toString(true);
+    } else {
+      this.runPrivilegedContainer = Boolean.toString(false);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
new file mode 100644
index 0000000..22b3877
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.containerlaunch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * build a classpath -allows for entries to be injected in front of
+ * YARN classpath as well as behind, adds appropriate separators, 
+ * extraction of local classpath, etc.
+ */
+public class ClasspathConstructor {
+
+    public static final String CLASS_PATH_SEPARATOR = ApplicationConstants.CLASS_PATH_SEPARATOR;
+  private final List<String> pathElements = new ArrayList<>();
+
+  public ClasspathConstructor() {
+  }
+
+
+  /**
+   * Get the list of JARs from the YARN settings
+   * @param config configuration
+   */
+  public List<String> yarnApplicationClasspath(Configuration config) {
+    String[] cp = config.getTrimmedStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH);
+    return cp != null ? Arrays.asList(cp) : new ArrayList<String>(0);
+
+  }
+
+
+  @Override
+  public String toString() {
+    return buildClasspath();
+  }
+
+  public String buildClasspath() {
+    return SliderUtils.join(pathElements,
+        CLASS_PATH_SEPARATOR,
+        false);
+  }
+
+  /**
+   * Get a copy of the path list
+   * @return the JARs
+   */
+  public List<String> getPathElements() {
+    return Collections.unmodifiableList(pathElements);
+  }
+
+  /**
+   * Append an entry
+   * @param path path
+   */
+  public void append(String path) {
+    pathElements.add(path);
+  }
+
+  /**
+   * Insert a path at the front of the list. This places it ahead of
+   * the standard YARN artifacts
+   * @param path path to the JAR. Absolute or relative -on the target
+   * system
+   */
+  public void insert(String path) {
+    pathElements.add(0, path);
+  }
+
+  public void appendAll(Collection<String> paths) {
+    pathElements.addAll(paths);
+  }
+
+  public void insertAll(Collection<String> paths) {
+    pathElements.addAll(0, paths);
+  }
+
+
+  public void addLibDir(String pathToLibDir) {
+    append(buildLibDir(pathToLibDir));
+  }
+
+  public void insertLibDir(String pathToLibDir) {
+    insert(buildLibDir(pathToLibDir));
+  }
+
+  public void addClassDirectory(String pathToDir) {
+    append(appendDirectoryTerminator(pathToDir));
+  }
+
+  public void insertClassDirectory(String pathToDir) {
+    insert(buildLibDir(appendDirectoryTerminator(pathToDir)));
+  }
+
+
+  public void addRemoteClasspathEnvVar() {
+    append(ApplicationConstants.Environment.CLASSPATH.$$());
+  }
+
+
+  public void insertRemoteClasspathEnvVar() {
+    append(ApplicationConstants.Environment.CLASSPATH.$$());
+  }
+
+
+  /**
+   * Build a lib dir path
+   * @param pathToLibDir path to the directory; may or may not end with a
+   * trailing space
+   * @return a path to a lib dir that is compatible with the java classpath
+   */
+  public String buildLibDir(String pathToLibDir) {
+    String dir = appendDirectoryTerminator(pathToLibDir);
+    dir += "*";
+    return dir;
+  }
+
+  private String appendDirectoryTerminator(String pathToLibDir) {
+    String dir = pathToLibDir.trim();
+    if (!dir.endsWith("/")) {
+      dir += "/";
+    }
+    return dir;
+  }
+
+  /**
+   * Split a classpath. This uses the local path separator so MUST NOT
+   * be used to work with remote classpaths
+   * @param localpath local path
+   * @return a splite
+   */
+  public Collection<String> splitClasspath(String localpath) {
+    String separator = System.getProperty("path.separator");
+    return StringUtils.getStringCollection(localpath, separator);
+  }
+
+  /**
+   * Get the local JVM classpath split up
+   * @return the list of entries on the JVM classpath env var
+   */
+  public Collection<String> localJVMClasspath() {
+    return splitClasspath(System.getProperty("java.class.path"));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org