You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/05/17 21:19:48 UTC

[2/2] hadoop git commit: YARN-8080. Add restart policy for YARN services. Contributed by Suma Shivaprasad

YARN-8080.  Add restart policy for YARN services.
            Contributed by Suma Shivaprasad


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f083ed8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f083ed8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f083ed8

Branch: refs/heads/trunk
Commit: 7f083ed8699a720d3fb82e4ec310356902a6ac30
Parents: 7802af6
Author: Eric Yang <ey...@apache.org>
Authored: Thu May 17 17:16:50 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Thu May 17 17:16:50 2018 -0400

----------------------------------------------------------------------
 ...RN-Simplified-V1-API-Layer-For-Services.yaml |   8 +
 .../hadoop/yarn/service/ClientAMService.java    |   7 +-
 .../hadoop/yarn/service/ServiceContext.java     |   4 +
 .../hadoop/yarn/service/ServiceManager.java     |  25 +-
 .../hadoop/yarn/service/ServiceScheduler.java   |  73 ++-
 .../yarn/service/api/records/Component.java     |  71 +++
 .../service/component/AlwaysRestartPolicy.java  |  82 ++++
 .../yarn/service/component/Component.java       | 209 ++++++--
 .../component/ComponentRestartPolicy.java       |  45 ++
 .../service/component/NeverRestartPolicy.java   |  82 ++++
 .../component/OnFailureRestartPolicy.java       |  87 ++++
 .../component/instance/ComponentInstance.java   |  91 +++-
 .../hadoop/yarn/service/utils/ServiceUtils.java |  18 +
 .../hadoop/yarn/service/ServiceTestUtils.java   |  46 +-
 .../hadoop/yarn/service/TestServiceManager.java |   6 +-
 .../yarn/service/component/TestComponent.java   |  99 +++-
 .../component/TestComponentRestartPolicy.java   | 130 +++++
 .../instance/TestComponentInstance.java         | 484 ++++++++++++++++++-
 .../markdown/yarn-service/YarnServiceAPI.md     |   2 +
 19 files changed, 1447 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
index cea8296..d90ae06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
@@ -424,6 +424,14 @@ definitions:
         items:
           type: string
         description: A list of quicklink keys defined at the service level, and to be resolved by this component.
+      restartPolicy:
+        type: string
+        description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)
+        enum:
+          - ALWAYS
+          - ON_FAILURE
+          - NEVER
+        default: ALWAYS
   ReadinessCheck:
     description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases.
     required:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index d5d6fa4..e97c3d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
@@ -130,7 +131,7 @@ public class ClientAMService extends AbstractService
     LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
     context.scheduler.getDiagnostics()
         .append("Stopped by user " + UserGroupInformation.getCurrentUser());
-    context.scheduler.setGracefulStop();
+    context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED);
 
     // Stop the service in 2 seconds delay to make sure this rpc call is completed.
     // shutdown hook will be executed which will stop AM gracefully.
@@ -157,10 +158,10 @@ public class ClientAMService extends AbstractService
   public UpgradeServiceResponseProto upgrade(
       UpgradeServiceRequestProto request) throws IOException {
     try {
-      context.getServiceManager().processUpgradeRequest(request.getVersion(),
-          request.getAutoFinalize());
       LOG.info("Upgrading service to version {} by {}", request.getVersion(),
           UserGroupInformation.getCurrentUser());
+      context.getServiceManager().processUpgradeRequest(request.getVersion(),
+          request.getAutoFinalize());
       return UpgradeServiceResponseProto.newBuilder().build();
     } catch (Exception ex) {
       return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
index 6c91b9c..8779153 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
@@ -56,4 +56,8 @@ public class ServiceContext {
   void setServiceManager(ServiceManager serviceManager) {
     this.serviceManager = Preconditions.checkNotNull(serviceManager);
   }
+
+  public Service getService() {
+    return service;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index e6a38dc..05ecb3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -266,12 +267,24 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
         event.setAutoFinalize(true);
       }
       compsThatNeedUpgrade.forEach(component -> {
-        ComponentEvent needUpgradeEvent = new ComponentEvent(
-            component.getName(), ComponentEventType.UPGRADE)
-            .setTargetSpec(component)
-            .setUpgradeVersion(event.getVersion());
-        context.scheduler.getDispatcher().getEventHandler().handle(
-            needUpgradeEvent);
+        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
+            restartPolicy = component.getRestartPolicy();
+
+        final ComponentRestartPolicy restartPolicyHandler =
+            Component.getRestartPolicyHandler(restartPolicy);
+        // Do not allow upgrades for components which have NEVER/ON_FAILURE
+        // restart policy
+        if (restartPolicyHandler.allowUpgrades()) {
+          ComponentEvent needUpgradeEvent = new ComponentEvent(
+              component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
+              component).setUpgradeVersion(event.getVersion());
+          context.scheduler.getDispatcher().getEventHandler().handle(
+              needUpgradeEvent);
+        } else {
+          LOG.info("The component {} has a restart "
+              + "policy that doesnt allow upgrades {} ", component.getName(),
+              component.getRestartPolicy().toString());
+        }
       });
     } else {
       // nothing to upgrade if upgrade auto finalize is requested, trigger a

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index ee0a1a7..d3e8e4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
+import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.util.BoundedAppender;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.slf4j.Logger;
@@ -89,8 +91,10 @@ import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -101,6 +105,10 @@ import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
 import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
 import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
+    .EXIT_FALSE;
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
+    .EXIT_SUCCESS;
 
 /**
  *
@@ -158,8 +166,15 @@ public class ServiceScheduler extends CompositeService {
 
   private boolean gracefulStop = false;
 
+  private volatile FinalApplicationStatus finalApplicationStatus =
+      FinalApplicationStatus.ENDED;
+
+  // For unit test override since we don't want to terminate UT process.
+  private ServiceUtils.ProcessTerminationHandler
+      terminationHandler = new ServiceUtils.ProcessTerminationHandler();
+
   public ServiceScheduler(ServiceContext context) {
-    super(context.service.getName());
+    super(context.getService().getName());
     this.context = context;
   }
 
@@ -256,8 +271,9 @@ public class ServiceScheduler extends CompositeService {
         .createAMRMClientAsync(1000, new AMRMClientCallback());
   }
 
-  protected void setGracefulStop() {
+  public void setGracefulStop(FinalApplicationStatus applicationStatus) {
     this.gracefulStop = true;
+    this.finalApplicationStatus = applicationStatus;
     nmClient.getClient().cleanupRunningContainersOnStop(true);
   }
 
@@ -877,4 +893,57 @@ public class ServiceScheduler extends CompositeService {
   public boolean hasAtLeastOnePlacementConstraint() {
     return hasAtLeastOnePlacementConstraint;
   }
+
+  /*
+* Check if all components of the scheduler finished.
+* If all components finished
+*   (which #failed-instances + #suceeded-instances = #total-n-containers)
+* The service will be terminated.
+*/
+  public synchronized void terminateServiceIfAllComponentsFinished() {
+    boolean shouldTerminate = true;
+
+    // Succeeded comps and failed comps, for logging purposes.
+    Set<String> succeededComponents = new HashSet<>();
+    Set<String> failedComponents = new HashSet<>();
+
+    for (Component comp : getAllComponents().values()) {
+      ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
+      if (!restartPolicy.shouldTerminate(comp)) {
+        shouldTerminate = false;
+        break;
+      }
+
+      long nFailed = comp.getNumFailedInstances();
+
+      if (nFailed > 0) {
+        failedComponents.add(comp.getName());
+      } else{
+        succeededComponents.add(comp.getName());
+      }
+    }
+
+    if (shouldTerminate) {
+      LOG.info("All component finished, exiting Service Master... "
+          + ", final status=" + (failedComponents.isEmpty() ?
+          "Succeeded" :
+          "Failed"));
+      LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils
+          .join(succeededComponents, ",") + "]");
+      LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
+          .join(failedComponents, ",") + "]");
+
+      if (failedComponents.isEmpty()) {
+        setGracefulStop(FinalApplicationStatus.SUCCEEDED);
+        getTerminationHandler().terminate(EXIT_SUCCESS);
+      } else{
+        setGracefulStop(FinalApplicationStatus.FAILED);
+        getTerminationHandler().terminate(EXIT_FALSE);
+      }
+    }
+  }
+
+  public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
+    return terminationHandler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
index 7deb076..0481123 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.yarn.service.api.records;
 
+import com.fasterxml.jackson.annotation.JsonValue;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
@@ -29,7 +30,9 @@ import java.util.Objects;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlEnum;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -98,6 +101,74 @@ public class Component implements Serializable {
   private List<Container> containers =
       Collections.synchronizedList(new ArrayList<Container>());
 
+
+  @JsonProperty("restart_policy")
+  @XmlElement(name = "restart_policy")
+  private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS;
+
+  /**
+   * Policy of restart component. Including ALWAYS - Long lived components
+   * (Always restart component instance even if instance exit code &#x3D; 0.);
+   *
+   * ON_FAILURE (Only restart component instance if instance exit code !&#x3D;
+   * 0);
+   * NEVER (Do not restart in any cases)
+   *
+   * @return restartPolicy
+   **/
+  @XmlType(name = "restart_policy")
+  @XmlEnum
+  public enum RestartPolicyEnum {
+    ALWAYS("ALWAYS"),
+
+    ON_FAILURE("ON_FAILURE"),
+
+    NEVER("NEVER");
+    private String value;
+
+    RestartPolicyEnum(String value) {
+      this.value = value;
+    }
+
+    @Override
+    @JsonValue
+    public String toString() {
+      return value;
+    }
+  }
+
+  public Component restartPolicy(RestartPolicyEnum restartPolicyEnumVal) {
+    this.restartPolicy = restartPolicyEnumVal;
+    return this;
+  }
+
+  /**
+   * Policy of restart component.
+   *
+   * Including
+   * ALWAYS (Always restart component instance even if instance exit
+   * code &#x3D; 0);
+   *
+   * ON_FAILURE (Only restart component instance if instance exit code !&#x3D;
+   * 0);
+   *
+   * NEVER (Do not restart in any cases)
+   *
+   * @return restartPolicy
+   **/
+  @ApiModelProperty(value = "Policy of restart component. Including ALWAYS "
+      + "(Always restart component even if instance exit code = 0); "
+      + "ON_FAILURE (Only restart component if instance exit code != 0); "
+      + "NEVER (Do not restart in any cases)")
+  public RestartPolicyEnum getRestartPolicy() {
+    return restartPolicy;
+  }
+
+  public void setRestartPolicy(RestartPolicyEnum restartPolicy) {
+    this.restartPolicy = restartPolicy;
+  }
+
+
   /**
    * Name of the service component (mandatory).
    **/

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java
new file mode 100644
index 0000000..704ab14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+/**
+ * Always restart policy allows for restarts for long live components which
+ * never terminate.
+ */
+public final class AlwaysRestartPolicy implements ComponentRestartPolicy {
+
+  private static AlwaysRestartPolicy INSTANCE = new AlwaysRestartPolicy();
+
+  private AlwaysRestartPolicy() {
+  }
+
+  public static AlwaysRestartPolicy getInstance() {
+    return INSTANCE;
+  }
+
+  @Override public boolean isLongLived() {
+    return true;
+  }
+
+  /**
+   * This is always false since these components never terminate
+   *
+   * @param component
+   * @return
+   */
+  @Override public boolean hasCompleted(Component component) {
+    return false;
+  }
+
+  /**
+   * This is always false since these components never terminate
+   *
+   * @param component
+   * @return
+   */
+  @Override public boolean hasCompletedSuccessfully(Component component) {
+    return false;
+  }
+
+  @Override public boolean shouldRelaunchInstance(
+      ComponentInstance componentInstance, ContainerStatus containerStatus) {
+    return true;
+  }
+
+  @Override public boolean isReadyForDownStream(Component dependentComponent) {
+    if (dependentComponent.getNumReadyInstances() < dependentComponent
+        .getNumDesiredInstances()) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override public boolean allowUpgrades() {
+    return true;
+  }
+
+  @Override public boolean shouldTerminate(Component component) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 7979c19..931877e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.yarn.service.component;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import static org.apache.hadoop.yarn.service.api.records.Component
+    .RestartPolicyEnum;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -111,6 +114,13 @@ public class Component implements EventHandler<ComponentEvent> {
   // The number of containers failed since last reset. This excludes preempted,
   // disk_failed containers etc. This will be reset to 0 periodically.
   public AtomicInteger currentContainerFailure = new AtomicInteger(0);
+
+  //succeeded and Failed instances are Populated only for RestartPolicyEnum
+  //.ON_FAILURE/NEVER
+  private Map<String, ComponentInstance> succeededInstances =
+      new ConcurrentHashMap<>();
+  private Map<String, ComponentInstance> failedInstances =
+      new ConcurrentHashMap<>();
   private boolean healthThresholdMonitorEnabled = false;
 
   private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
@@ -297,7 +307,7 @@ public class Component implements EventHandler<ComponentEvent> {
     @Override
     public ComponentState transition(Component component,
         ComponentEvent event) {
-      component.setDesiredContainers((int)event.getDesired());
+      component.setDesiredContainers((int) event.getDesired());
       if (!component.areDependenciesReady()) {
         LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
             + " satisfied.", component.getName());
@@ -402,11 +412,37 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  private static ComponentState checkIfStable(Component component) {
+  @VisibleForTesting
+  static ComponentState checkIfStable(Component component) {
+    if (component.getRestartPolicyHandler().isLongLived()) {
+      return updateStateForLongRunningComponents(component);
+    } else{
+      //NEVER/ON_FAILURE
+      return updateStateForTerminatingComponents(component);
+    }
+  }
+
+  private static ComponentState updateStateForTerminatingComponents(
+      Component component) {
+    if (component.getNumRunningInstances() + component
+        .getNumSucceededInstances() + component.getNumFailedInstances()
+        < component.getComponentSpec().getNumberOfContainers()) {
+      component.componentSpec.setState(
+          org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
+      return FLEXING;
+    } else{
+      component.componentSpec.setState(
+          org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
+      return STABLE;
+    }
+  }
+
+  private static ComponentState updateStateForLongRunningComponents(
+      Component component) {
     // if desired == running
     if (component.componentMetrics.containersReady.value() == component
-        .getComponentSpec().getNumberOfContainers() &&
-        component.numContainersThatNeedUpgrade.get() == 0) {
+        .getComponentSpec().getNumberOfContainers()
+        && component.numContainersThatNeedUpgrade.get() == 0) {
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
@@ -425,17 +461,41 @@ public class Component implements EventHandler<ComponentEvent> {
 
   // This method should be called whenever there is an increment or decrement
   // of a READY state container of a component
-  public static synchronized void checkAndUpdateComponentState(
+  //This should not matter for terminating components
+  private static synchronized void checkAndUpdateComponentState(
       Component component, boolean isIncrement) {
     org.apache.hadoop.yarn.service.api.records.ComponentState curState =
         component.componentSpec.getState();
-    if (isIncrement) {
-      // check if all containers are in READY state
-      if (component.numContainersThatNeedUpgrade.get() == 0 &&
-          component.componentMetrics.containersReady.value() ==
-              component.componentMetrics.containersDesired.value()) {
-        component.componentSpec.setState(
-            org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
+
+    if (component.getRestartPolicyHandler().isLongLived()) {
+      if (isIncrement) {
+        // check if all containers are in READY state
+        if (component.numContainersThatNeedUpgrade.get() == 0
+            && component.componentMetrics.containersReady.value()
+            == component.componentMetrics.containersDesired.value()) {
+          component.componentSpec.setState(
+              org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
+          if (curState != component.componentSpec.getState()) {
+            LOG.info("[COMPONENT {}] state changed from {} -> {}",
+                component.componentSpec.getName(), curState,
+                component.componentSpec.getState());
+          }
+          // component state change will trigger re-check of service state
+          component.context.getServiceManager().checkAndUpdateServiceState();
+        }
+      } else{
+        // container moving out of READY state could be because of FLEX down so
+        // still need to verify the count before changing the component state
+        if (component.componentMetrics.containersReady.value()
+            < component.componentMetrics.containersDesired.value()) {
+          component.componentSpec.setState(
+              org.apache.hadoop.yarn.service.api.records.ComponentState
+                  .FLEXING);
+        } else if (component.componentMetrics.containersReady.value()
+            == component.componentMetrics.containersDesired.value()) {
+          component.componentSpec.setState(
+              org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
+        }
         if (curState != component.componentSpec.getState()) {
           LOG.info("[COMPONENT {}] state changed from {} -> {}",
               component.componentSpec.getName(), curState,
@@ -445,44 +505,38 @@ public class Component implements EventHandler<ComponentEvent> {
         component.context.getServiceManager().checkAndUpdateServiceState();
       }
     } else {
-      // container moving out of READY state could be because of FLEX down so
-      // still need to verify the count before changing the component state
-      if (component.componentMetrics.containersReady
-          .value() < component.componentMetrics.containersDesired.value()) {
-        component.componentSpec.setState(
-            org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
-      } else if (component.componentMetrics.containersReady
-          .value() == component.componentMetrics.containersDesired.value()) {
-        component.componentSpec.setState(
-            org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
-      }
-      if (curState != component.componentSpec.getState()) {
-        LOG.info("[COMPONENT {}] state changed from {} -> {}",
-            component.componentSpec.getName(), curState,
-            component.componentSpec.getState());
-      }
       // component state change will trigger re-check of service state
       component.context.getServiceManager().checkAndUpdateServiceState();
     }
     // when the service is stable then the state of component needs to
     // transition to stable
-    component.dispatcher.getEventHandler().handle(new ComponentEvent(
-        component.getName(), ComponentEventType.CHECK_STABLE));
+    component.dispatcher.getEventHandler().handle(
+        new ComponentEvent(component.getName(),
+            ComponentEventType.CHECK_STABLE));
   }
 
   private static class ContainerCompletedTransition extends BaseTransition {
     @Override
     public void transition(Component component, ComponentEvent event) {
+
       component.updateMetrics(event.getStatus());
       component.dispatcher.getEventHandler().handle(
-          new ComponentInstanceEvent(event.getStatus().getContainerId(),
-              STOP).setStatus(event.getStatus()));
-      component.componentSpec.setState(
-          org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
-      if (component.context.service.getState().equals(ServiceState.STABLE)) {
-        component.getScheduler().getApp().setState(ServiceState.STARTED);
-        LOG.info("Service def state changed from {} -> {}",
-            ServiceState.STABLE, ServiceState.STARTED);
+          new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP)
+              .setStatus(event.getStatus()));
+
+      ComponentRestartPolicy restartPolicy =
+          component.getRestartPolicyHandler();
+
+      if (restartPolicy.shouldRelaunchInstance(event.getInstance(),
+          event.getStatus())) {
+        component.componentSpec.setState(
+            org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
+
+        if (component.context.service.getState().equals(ServiceState.STABLE)) {
+          component.getScheduler().getApp().setState(ServiceState.STARTED);
+          LOG.info("Service def state changed from {} -> {}",
+              ServiceState.STABLE, ServiceState.STARTED);
+        }
       }
     }
   }
@@ -725,8 +779,6 @@ public class Component implements EventHandler<ComponentEvent> {
     componentMetrics.containersDesired.set(n);
   }
 
-
-
   private void updateMetrics(ContainerStatus status) {
     switch (status.getExitStatus()) {
     case SUCCESS:
@@ -753,7 +805,7 @@ public class Component implements EventHandler<ComponentEvent> {
       String host = scheduler.getLiveInstances().get(status.getContainerId())
           .getNodeId().getHost();
       failureTracker.incNodeFailure(host);
-      currentContainerFailure.getAndIncrement() ;
+      currentContainerFailure.getAndIncrement();
     }
   }
 
@@ -763,17 +815,18 @@ public class Component implements EventHandler<ComponentEvent> {
       return true;
     }
     for (String dependency : dependencies) {
-      Component dependentComponent =
-          scheduler.getAllComponents().get(dependency);
+      Component dependentComponent = scheduler.getAllComponents().get(
+          dependency);
       if (dependentComponent == null) {
         LOG.error("Couldn't find dependency {} for {} (should never happen)",
             dependency, getName());
         continue;
       }
-      if (dependentComponent.getNumReadyInstances() < dependentComponent
-          .getNumDesiredInstances()) {
+
+      if (!dependentComponent.isReadyForDownstream()) {
         LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
-                + " instances are ready.", getName(), dependency,
+                + " instances are ready or the dependent component has not "
+                + "completed ", getName(), dependency,
             dependentComponent.getNumReadyInstances(),
             dependentComponent.getNumDesiredInstances());
         return false;
@@ -782,6 +835,7 @@ public class Component implements EventHandler<ComponentEvent> {
     return true;
   }
 
+
   public Map<String, String> getDependencyHostIpTokens() {
     Map<String, String> tokens = new HashMap<>();
     List<String> dependencies = componentSpec.getDependencies();
@@ -955,4 +1009,67 @@ public class Component implements EventHandler<ComponentEvent> {
       boolean healthThresholdMonitorEnabled) {
     this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled;
   }
+
+  public Collection<ComponentInstance> getSucceededInstances() {
+    return succeededInstances.values();
+  }
+
+  public long getNumSucceededInstances() {
+    return succeededInstances.size();
+  }
+
+  public long getNumFailedInstances() {
+    return failedInstances.size();
+  }
+
+  public Collection<ComponentInstance> getFailedInstances() {
+    return failedInstances.values();
+  }
+
+  public synchronized void markAsSucceeded(ComponentInstance instance) {
+    removeFailedInstanceIfExists(instance);
+    succeededInstances.put(instance.getCompInstanceName(), instance);
+  }
+
+  public synchronized void markAsFailed(ComponentInstance instance) {
+    removeSuccessfulInstanceIfExists(instance);
+    failedInstances.put(instance.getCompInstanceName(), instance);
+  }
+
+  public boolean removeFailedInstanceIfExists(ComponentInstance instance) {
+    if (failedInstances.containsKey(instance.getCompInstanceName())) {
+      failedInstances.remove(instance.getCompInstanceName());
+      return true;
+    }
+    return false;
+  }
+
+  public boolean removeSuccessfulInstanceIfExists(ComponentInstance instance) {
+    if (succeededInstances.containsKey(instance.getCompInstanceName())) {
+      succeededInstances.remove(instance.getCompInstanceName());
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isReadyForDownstream() {
+    return getRestartPolicyHandler().isReadyForDownStream(this);
+  }
+
+  public static ComponentRestartPolicy getRestartPolicyHandler(
+      RestartPolicyEnum restartPolicyEnum) {
+
+    if (RestartPolicyEnum.NEVER == restartPolicyEnum) {
+      return NeverRestartPolicy.getInstance();
+    } else if (RestartPolicyEnum.ON_FAILURE == restartPolicyEnum) {
+      return OnFailureRestartPolicy.getInstance();
+    } else{
+      return AlwaysRestartPolicy.getInstance();
+    }
+  }
+
+  public ComponentRestartPolicy getRestartPolicyHandler() {
+    RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy();
+    return getRestartPolicyHandler(restartPolicyEnum);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java
new file mode 100644
index 0000000..23b0fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+/**
+ * Interface for Component Restart policies.
+ * Which is used to make decisions on termination/restart of components and
+ * their instances.
+ */
+public interface ComponentRestartPolicy {
+
+  boolean isLongLived();
+
+  boolean hasCompleted(Component component);
+
+  boolean hasCompletedSuccessfully(Component component);
+
+  boolean shouldRelaunchInstance(ComponentInstance componentInstance,
+      ContainerStatus containerStatus);
+
+  boolean isReadyForDownStream(Component component);
+
+  boolean allowUpgrades();
+
+  boolean shouldTerminate(Component component);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java
new file mode 100644
index 0000000..ace1f89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+/**
+ * Policy for components with instances that do not require/support a restart.
+ */
+public final class NeverRestartPolicy implements ComponentRestartPolicy {
+
+  private static NeverRestartPolicy INSTANCE = new NeverRestartPolicy();
+
+  private NeverRestartPolicy() {
+  }
+
+  public static NeverRestartPolicy getInstance() {
+    return INSTANCE;
+  }
+
+  @Override public boolean isLongLived() {
+    return false;
+  }
+
+  @Override public boolean hasCompleted(Component component) {
+    if (component.getNumSucceededInstances() + component.getNumFailedInstances()
+        < component.getNumDesiredInstances()) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override public boolean hasCompletedSuccessfully(Component component) {
+    if (component.getNumSucceededInstances() == component
+        .getNumDesiredInstances()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override public boolean shouldRelaunchInstance(
+      ComponentInstance componentInstance, ContainerStatus containerStatus) {
+    return false;
+  }
+
+  @Override public boolean isReadyForDownStream(Component component) {
+    if (hasCompleted(component)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override public boolean allowUpgrades() {
+    return false;
+  }
+
+  @Override public boolean shouldTerminate(Component component) {
+    long nSucceeded = component.getNumSucceededInstances();
+    long nFailed = component.getNumFailedInstances();
+    if (nSucceeded + nFailed < component.getComponentSpec()
+        .getNumberOfContainers()) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java
new file mode 100644
index 0000000..39fba2a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+/**
+ * Policy for components that require restarts for instances on failure.
+ */
+public final class OnFailureRestartPolicy implements ComponentRestartPolicy {
+
+  private static OnFailureRestartPolicy INSTANCE = new OnFailureRestartPolicy();
+
+  private OnFailureRestartPolicy() {
+  }
+
+  public static OnFailureRestartPolicy getInstance() {
+    return INSTANCE;
+  }
+
+  @Override public boolean isLongLived() {
+    return false;
+  }
+
+  @Override public boolean hasCompleted(Component component) {
+    if (hasCompletedSuccessfully(component)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override public boolean hasCompletedSuccessfully(Component component) {
+    if (component.getNumSucceededInstances() == component
+        .getNumDesiredInstances()) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override public boolean shouldRelaunchInstance(
+      ComponentInstance componentInstance, ContainerStatus containerStatus) {
+
+    if (ComponentInstance.hasContainerFailed(containerStatus)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override public boolean isReadyForDownStream(Component component) {
+    if (hasCompletedSuccessfully(component)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override public boolean allowUpgrades() {
+    return false;
+  }
+
+  @Override public boolean shouldTerminate(Component component) {
+    long nSucceeded = component.getNumSucceededInstances();
+    if (nSucceeded < component.getComponentSpec().getNumberOfContainers()) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index a323649..529596d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service.component.instance;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
@@ -25,9 +26,9 @@ import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
 import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
@@ -96,8 +98,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
   // This container object is used for rest API query
   private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
 
+
   private static final StateMachineFactory<ComponentInstance,
-      ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
+      ComponentInstanceState, ComponentInstanceEventType,
+      ComponentInstanceEvent>
       stateMachineFactory =
       new StateMachineFactory<ComponentInstance, ComponentInstanceState,
           ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
@@ -230,6 +234,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  @VisibleForTesting
+  static void handleComponentInstanceRelaunch(
+      ComponentInstance compInstance, ComponentInstanceEvent event) {
+    Component comp = compInstance.getComponent();
+
+    // Do we need to relaunch the service?
+    boolean hasContainerFailed = hasContainerFailed(event.getStatus());
+
+    ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
+
+    if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
+      // re-ask the failed container.
+      comp.requestContainers(1);
+      comp.reInsertPendingInstance(compInstance);
+      LOG.info(compInstance.getCompInstanceId()
+              + ": {} completed. Reinsert back to pending list and requested " +
+              "a new container." + System.lineSeparator() +
+              " exitStatus={}, diagnostics={}.",
+          event.getContainerId(), event.getStatus().getExitStatus(),
+          event.getStatus().getDiagnostics());
+    } else {
+      // When no relaunch, update component's #succeeded/#failed
+      // instances.
+      if (hasContainerFailed) {
+        comp.markAsFailed(compInstance);
+      } else {
+        comp.markAsSucceeded(compInstance);
+      }
+      LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
+          " succeeded" :
+          " failed") + " without retry, exitStatus=" + event.getStatus());
+      comp.getScheduler().terminateServiceIfAllComponentsFinished();
+    }
+  }
+
+  public static boolean hasContainerFailed(ContainerStatus containerStatus) {
+    //Mark conainer as failed if we cant get its exit status i.e null?
+    return containerStatus == null || containerStatus.getExitStatus() !=
+        ContainerExitStatus.SUCCESS;
+  }
+
   private static class ContainerStoppedTransition extends  BaseTransition {
     // whether the container failed before launched by AM or not.
     boolean failedBeforeLaunching = false;
@@ -244,9 +289,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
-      // re-ask the failed container.
+
       Component comp = compInstance.component;
-      comp.requestContainers(1);
       String containerDiag =
           compInstance.getCompInstanceId() + ": " + event.getStatus()
               .getDiagnostics();
@@ -259,7 +303,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         compInstance.component.decContainersReady(true);
       }
       compInstance.component.decRunningContainers();
-      boolean shouldExit = false;
+      // Should we fail (terminate) the service?
+      boolean shouldFailService = false;
+
+      final ServiceScheduler scheduler = comp.getScheduler();
       // Check if it exceeds the failure threshold, but only if health threshold
       // monitor is not enabled
       if (!comp.isHealthThresholdMonitorEnabled()
@@ -271,10 +318,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
             comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
         compInstance.diagnostics.append(exitDiag);
         // append to global diagnostics that will be reported to RM.
-        comp.getScheduler().getDiagnostics().append(containerDiag);
-        comp.getScheduler().getDiagnostics().append(exitDiag);
+        scheduler.getDiagnostics().append(containerDiag);
+        scheduler.getDiagnostics().append(exitDiag);
         LOG.warn(exitDiag);
-        shouldExit = true;
+        shouldFailService = true;
       }
 
       if (!failedBeforeLaunching) {
@@ -296,25 +343,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       }
 
       // remove the failed ContainerId -> CompInstance mapping
-      comp.getScheduler().removeLiveCompInstance(event.getContainerId());
+      scheduler.removeLiveCompInstance(event.getContainerId());
 
-      comp.reInsertPendingInstance(compInstance);
+      // According to component restart policy, handle container restart
+      // or finish the service (if all components finished)
+      handleComponentInstanceRelaunch(compInstance, event);
 
-      LOG.info(compInstance.getCompInstanceId()
-              + ": {} completed. Reinsert back to pending list and requested " +
-              "a new container." + System.lineSeparator() +
-              " exitStatus={}, diagnostics={}.",
-          event.getContainerId(), event.getStatus().getExitStatus(),
-          event.getStatus().getDiagnostics());
-      if (shouldExit) {
-        // Sleep for 5 seconds in hope that the state can be recorded in ATS.
-        // in case there's a client polling the comp state, it can be notified.
-        try {
-          Thread.sleep(5000);
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted on sleep while exiting.", e);
-        }
-        ExitUtil.terminate(-1);
+      if (shouldFailService) {
+        scheduler.getTerminationHandler().terminate(-1);
       }
     }
   }
@@ -630,4 +666,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         >>> 32));
     return result;
   }
+
+  @VisibleForTesting public org.apache.hadoop.yarn.service.api.records
+      .Container getContainerSpec() {
+    return containerSpec;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
index 915b836..707bbf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
@@ -571,4 +572,21 @@ public final class ServiceUtils {
     // Fallback to querying the default hostname as we did before.
     return InetAddress.getLocalHost().getCanonicalHostName();
   }
+
+  /**
+   * Process termination handler - exist with specified exit code after
+   * waiting a while for ATS state to be in sync.
+   */
+  public static class ProcessTerminationHandler {
+    public void terminate(int exitCode) {
+      // Sleep for 5 seconds in hope that the state can be recorded in ATS.
+      // in case there's a client polling the comp state, it can be notified.
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        log.info("Interrupted on sleep while exiting.", e);
+      }
+      ExitUtil.terminate(exitCode);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 599b8a7..86b4cea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -57,6 +57,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
@@ -99,8 +101,32 @@ public class ServiceTestUtils {
     return exampleApp;
   }
 
+  // Example service definition
+  // 2 components, each of which has 2 containers.
+  public static Service createTerminatingJobExample(String serviceName) {
+    Service exampleApp = new Service();
+    exampleApp.setName(serviceName);
+    exampleApp.setVersion("v1");
+    exampleApp.addComponent(
+        createComponent("terminating-comp1", 2, "sleep " + "1000",
+            Component.RestartPolicyEnum.NEVER, null));
+    exampleApp.addComponent(
+        createComponent("terminating-comp2", 2, "sleep 1000",
+            Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
+                add("terminating-comp1");
+            }}));
+    exampleApp.addComponent(
+        createComponent("terminating-comp3", 2, "sleep 1000",
+            Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
+                add("terminating-comp2");
+            }}));
+
+    return exampleApp;
+  }
+
   public static Component createComponent(String name) {
-    return createComponent(name, 2L, "sleep 1000");
+    return createComponent(name, 2L, "sleep 1000",
+        Component.RestartPolicyEnum.ALWAYS, null);
   }
 
   protected static Component createComponent(String name, long numContainers,
@@ -116,6 +142,18 @@ public class ServiceTestUtils {
     return comp1;
   }
 
+  protected static Component createComponent(String name, long numContainers,
+      String command, Component.RestartPolicyEnum restartPolicyEnum,
+      List<String> dependencies) {
+    Component comp = createComponent(name, numContainers, command);
+    comp.setRestartPolicy(restartPolicyEnum);
+
+    if (dependencies != null) {
+      comp.dependencies(dependencies);
+    }
+    return comp;
+  }
+
   public static SliderFileSystem initMockFs() throws IOException {
     return initMockFs(null);
   }
@@ -306,6 +344,12 @@ public class ServiceTestUtils {
     return client;
   }
 
+  public static ServiceManager createServiceManager(ServiceContext context) {
+    ServiceManager serviceManager = new ServiceManager(context);
+    context.setServiceManager(serviceManager);
+    return serviceManager;
+  }
+
   /**
    * Creates a YarnClient for test purposes.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index 56a0c71..fc509f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -227,14 +227,16 @@ public class TestServiceManager {
   }
 
   public static Service createBaseDef(String name) {
+    return createDef(name, ServiceTestUtils.createExampleApplication());
+  }
+
+  public static Service createDef(String name, Service serviceDef) {
     ApplicationId applicationId = ApplicationId.newInstance(
         System.currentTimeMillis(), 1);
-    Service serviceDef = ServiceTestUtils.createExampleApplication();
     serviceDef.setId(applicationId.toString());
     serviceDef.setName(name);
     serviceDef.setState(ServiceState.STARTED);
     Artifact artifact = createTestArtifact("1");
-
     serviceDef.getComponents().forEach(component ->
         component.setArtifact(artifact));
     return serviceDef;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
index 600e438..d7c15ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
+
 import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -50,6 +52,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
+
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -60,6 +63,9 @@ import static org.mockito.Mockito.when;
  */
 public class TestComponent {
 
+  private static final int WAIT_MS_PER_LOOP = 1000;
+  static final Logger LOG = Logger.getLogger(TestComponent.class);
+
   @Rule
   public ServiceTestUtils.ServiceFSWatcher rule =
       new ServiceTestUtils.ServiceFSWatcher();
@@ -158,6 +164,57 @@ public class TestComponent {
         comp.getComponentSpec().getConfiguration().getEnv("key1"));
   }
 
+  @Test
+  public void testComponentStateUpdatesWithTerminatingComponents() throws
+      Exception {
+    final String serviceName =
+        "testComponentStateUpdatesWithTerminatingComponents";
+
+    Service testService = ServiceTestUtils.createTerminatingJobExample(
+        serviceName);
+    TestServiceManager.createDef(serviceName, testService);
+
+    ServiceContext context = createTestContext(rule, testService);
+
+    for (Component comp : context.scheduler.getAllComponents().values()) {
+
+      Iterator<ComponentInstance> instanceIter = comp.
+          getAllComponentInstances().iterator();
+
+      ComponentInstance componentInstance = instanceIter.next();
+      Container instanceContainer = componentInstance.getContainer();
+
+      Assert.assertEquals(0, comp.getNumSucceededInstances());
+      Assert.assertEquals(0, comp.getNumFailedInstances());
+      Assert.assertEquals(2, comp.getNumRunningInstances());
+      Assert.assertEquals(2, comp.getNumReadyInstances());
+      Assert.assertEquals(0, comp.getPendingInstances().size());
+
+      //stop 1 container
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          instanceContainer.getId(),
+          org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+          "successful", 0);
+      comp.handle(new ComponentEvent(comp.getName(),
+          ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus));
+      componentInstance.handle(
+          new ComponentInstanceEvent(componentInstance.getContainer().getId(),
+              ComponentInstanceEventType.STOP).setStatus(containerStatus));
+
+      Assert.assertEquals(1, comp.getNumSucceededInstances());
+      Assert.assertEquals(0, comp.getNumFailedInstances());
+      Assert.assertEquals(1, comp.getNumRunningInstances());
+      Assert.assertEquals(1, comp.getNumReadyInstances());
+      Assert.assertEquals(0, comp.getPendingInstances().size());
+
+      org.apache.hadoop.yarn.service.component.ComponentState componentState =
+          Component.checkIfStable(comp);
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.service.component.ComponentState.STABLE,
+          componentState);
+    }
+  }
+
   private static org.apache.hadoop.yarn.service.api.records.Component
       createSpecWithEnv(String serviceName, String compName, String key,
       String val) {
@@ -171,31 +228,38 @@ public class TestComponent {
   public static ServiceContext createTestContext(
       ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
       throws Exception {
+    return createTestContext(fsWatcher,
+        TestServiceManager.createBaseDef(serviceName));
+  }
+
+  public static ServiceContext createTestContext(
+      ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef)
+      throws Exception {
     ServiceContext context = new ServiceContext();
-    context.service = TestServiceManager.createBaseDef(serviceName);
+    context.service = serviceDef;
     context.fs = fsWatcher.getFs();
 
     ContainerLaunchService mockLaunchService = mock(
         ContainerLaunchService.class);
 
     context.scheduler = new ServiceScheduler(context) {
-      @Override
-      protected YarnRegistryViewForProviders createYarnRegistryOperations(
+      @Override protected YarnRegistryViewForProviders
+      createYarnRegistryOperations(
           ServiceContext context, RegistryOperations registryClient) {
         return mock(YarnRegistryViewForProviders.class);
       }
 
-      @Override
-      public NMClientAsync createNMClient() {
+      @Override public NMClientAsync createNMClient() {
         NMClientAsync nmClientAsync = super.createNMClient();
         NMClient nmClient = mock(NMClient.class);
         try {
           when(nmClient.getContainerStatus(anyObject(), anyObject()))
-              .thenAnswer((Answer<ContainerStatus>) invocation ->
-                  ContainerStatus.newInstance(
-                      (ContainerId) invocation.getArguments()[0],
-                      org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
-                      "", 0));
+              .thenAnswer(
+                  (Answer<ContainerStatus>) invocation -> ContainerStatus
+                      .newInstance((ContainerId) invocation.getArguments()[0],
+                          org.apache.hadoop.yarn.api.records.ContainerState
+                              .RUNNING,
+                          "", 0));
         } catch (YarnException | IOException e) {
           throw new RuntimeException(e);
         }
@@ -203,16 +267,18 @@ public class TestComponent {
         return nmClientAsync;
       }
 
-      @Override
-      public ContainerLaunchService getContainerLaunchService() {
+      @Override public ContainerLaunchService getContainerLaunchService() {
         return mockLaunchService;
       }
     };
     context.scheduler.init(fsWatcher.getConf());
 
+    ServiceTestUtils.createServiceManager(context);
+
     doNothing().when(mockLaunchService).
         reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
     stabilizeComponents(context);
+
     return context;
   }
 
@@ -223,6 +289,8 @@ public class TestComponent {
     context.attemptId = attemptId;
     Map<String, Component>
         componentState = context.scheduler.getAllComponents();
+
+    int counter = 0;
     for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
         context.service.getComponents()) {
       Component component = new org.apache.hadoop.yarn.service.component.
@@ -230,9 +298,12 @@ public class TestComponent {
       componentState.put(component.getName(), component);
       component.handle(new ComponentEvent(component.getName(),
           ComponentEventType.FLEX));
+
       for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
-        assignNewContainer(attemptId, i + 1, context, component);
+        counter++;
+        assignNewContainer(attemptId, counter, context, component);
       }
+
       component.handle(new ComponentEvent(component.getName(),
           ComponentEventType.CHECK_STABLE));
     }
@@ -241,6 +312,8 @@ public class TestComponent {
   private static void assignNewContainer(
       ApplicationAttemptId attemptId, long containerNum,
       ServiceContext context, Component component) {
+
+
     Container container = org.apache.hadoop.yarn.api.records.Container
         .newInstance(ContainerId.newContainerId(attemptId, containerNum),
             NODE_ID, "localhost", null, null,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
new file mode 100644
index 0000000..60f5c91
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for ComponentRestartPolicy implementations.
+ */
+public class TestComponentRestartPolicy {
+
+  @Test
+  public void testAlwaysRestartPolicy() throws Exception {
+
+    AlwaysRestartPolicy alwaysRestartPolicy = AlwaysRestartPolicy.getInstance();
+
+    Component component = mock(Component.class);
+    when(component.getNumReadyInstances()).thenReturn(1);
+    when(component.getNumDesiredInstances()).thenReturn(2);
+
+    ComponentInstance instance = mock(ComponentInstance.class);
+    when(instance.getComponent()).thenReturn(component);
+
+    ContainerStatus containerStatus = mock(ContainerStatus.class);
+
+    assertEquals(true, alwaysRestartPolicy.isLongLived());
+    assertEquals(true, alwaysRestartPolicy.allowUpgrades());
+    assertEquals(false, alwaysRestartPolicy.hasCompleted(component));
+    assertEquals(false,
+        alwaysRestartPolicy.hasCompletedSuccessfully(component));
+
+    assertEquals(true,
+        alwaysRestartPolicy.shouldRelaunchInstance(instance, containerStatus));
+
+    assertEquals(false, alwaysRestartPolicy.isReadyForDownStream(component));
+  }
+
+  @Test
+  public void testNeverRestartPolicy() throws Exception {
+
+    NeverRestartPolicy restartPolicy = NeverRestartPolicy.getInstance();
+
+    Component component = mock(Component.class);
+    when(component.getNumSucceededInstances()).thenReturn(new Long(1));
+    when(component.getNumFailedInstances()).thenReturn(new Long(2));
+    when(component.getNumDesiredInstances()).thenReturn(3);
+
+    ComponentInstance instance = mock(ComponentInstance.class);
+    when(instance.getComponent()).thenReturn(component);
+
+    ContainerStatus containerStatus = mock(ContainerStatus.class);
+
+    assertEquals(false, restartPolicy.isLongLived());
+    assertEquals(false, restartPolicy.allowUpgrades());
+    assertEquals(true, restartPolicy.hasCompleted(component));
+    assertEquals(false,
+        restartPolicy.hasCompletedSuccessfully(component));
+
+    assertEquals(false,
+        restartPolicy.shouldRelaunchInstance(instance, containerStatus));
+
+    assertEquals(true, restartPolicy.isReadyForDownStream(component));
+  }
+
+  @Test
+  public void testOnFailureRestartPolicy() throws Exception {
+
+    OnFailureRestartPolicy restartPolicy = OnFailureRestartPolicy.getInstance();
+
+    Component component = mock(Component.class);
+    when(component.getNumSucceededInstances()).thenReturn(new Long(3));
+    when(component.getNumFailedInstances()).thenReturn(new Long(0));
+    when(component.getNumDesiredInstances()).thenReturn(3);
+
+    ComponentInstance instance = mock(ComponentInstance.class);
+    when(instance.getComponent()).thenReturn(component);
+
+    ContainerStatus containerStatus = mock(ContainerStatus.class);
+    when(containerStatus.getExitStatus()).thenReturn(0);
+
+    assertEquals(false, restartPolicy.isLongLived());
+    assertEquals(false, restartPolicy.allowUpgrades());
+    assertEquals(true, restartPolicy.hasCompleted(component));
+    assertEquals(true,
+        restartPolicy.hasCompletedSuccessfully(component));
+
+    assertEquals(false,
+        restartPolicy.shouldRelaunchInstance(instance, containerStatus));
+
+    assertEquals(true, restartPolicy.isReadyForDownStream(component));
+
+
+    when(component.getNumSucceededInstances()).thenReturn(new Long(2));
+    when(component.getNumFailedInstances()).thenReturn(new Long(1));
+    when(component.getNumDesiredInstances()).thenReturn(3);
+
+    assertEquals(false, restartPolicy.hasCompleted(component));
+    assertEquals(false,
+        restartPolicy.hasCompletedSuccessfully(component));
+
+    when(containerStatus.getExitStatus()).thenReturn(-1000);
+
+    assertEquals(true,
+        restartPolicy.shouldRelaunchInstance(instance, containerStatus));
+
+    assertEquals(false, restartPolicy.isReadyForDownStream(component));
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org