You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/09/26 18:53:47 UTC

[2/2] hadoop git commit: YARN-8665. Added Yarn service cancel upgrade option. Contributed by Chandni Singh

YARN-8665.  Added Yarn service cancel upgrade option.
            Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/913f87da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/913f87da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/913f87da

Branch: refs/heads/trunk
Commit: 913f87dada27776c539dfb352400ecf8d40e7943
Parents: e0ff8e2
Author: Eric Yang <ey...@apache.org>
Authored: Wed Sep 26 14:51:35 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Wed Sep 26 14:51:35 2018 -0400

----------------------------------------------------------------------
 .../yarn/service/client/ApiServiceClient.java   |  20 ++
 .../hadoop/yarn/service/webapp/ApiServer.java   |  34 +-
 .../hadoop/yarn/service/ClientAMProtocol.java   |   5 +
 .../hadoop/yarn/service/ClientAMService.java    |  12 +
 .../hadoop/yarn/service/ServiceEvent.java       |  14 +-
 .../hadoop/yarn/service/ServiceEventType.java   |   3 +-
 .../hadoop/yarn/service/ServiceManager.java     | 331 +++++++++++++------
 .../service/api/records/ContainerState.java     |   2 +-
 .../yarn/service/api/records/ServiceState.java  |   7 +-
 .../yarn/service/client/ServiceClient.java      |  21 ++
 .../yarn/service/component/Component.java       | 275 ++++++++++-----
 .../yarn/service/component/ComponentEvent.java  |  10 -
 .../service/component/ComponentEventType.java   |   1 +
 .../yarn/service/component/ComponentState.java  |   3 +-
 .../component/instance/ComponentInstance.java   | 269 ++++++++++++---
 .../instance/ComponentInstanceEventType.java    |   3 +-
 .../instance/ComponentInstanceState.java        |   3 +-
 .../containerlaunch/ContainerLaunchService.java |   4 +-
 .../pb/client/ClientAMProtocolPBClientImpl.java |  13 +
 .../service/ClientAMProtocolPBServiceImpl.java  |  13 +
 .../yarn/service/provider/ProviderUtils.java    |   9 +-
 .../yarn/service/utils/ServiceApiUtil.java      |  14 +-
 .../yarn/service/utils/SliderFileSystem.java    |  49 +++
 .../src/main/proto/ClientAMProtocol.proto       |   8 +
 .../yarn/service/MockRunningServiceContext.java |  20 +-
 .../hadoop/yarn/service/ServiceTestUtils.java   |   2 +-
 .../hadoop/yarn/service/TestServiceManager.java | 136 ++++++--
 .../yarn/service/TestYarnNativeServices.java    |  44 +++
 .../yarn/service/client/TestServiceCLI.java     |  17 +
 .../yarn/service/component/TestComponent.java   | 239 +++++++++++--
 .../instance/TestComponentInstance.java         | 177 +++++++++-
 .../src/test/resources/log4j.properties         |  19 ++
 .../hadoop/yarn/client/cli/ApplicationCLI.java  |  12 +-
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |   2 +
 .../hadoop/yarn/client/api/AppAdminClient.java  |  13 +
 .../containermanager/ContainerManagerImpl.java  |   1 +
 36 files changed, 1470 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index ca6cc50..b7a1541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient {
     }
     return null;
   }
+
+  @Override
+  public int actionCancelUpgrade(
+      String appName) throws IOException, YarnException {
+    int result;
+    try {
+      Service service = new Service();
+      service.setName(appName);
+      service.setState(ServiceState.CANCEL_UPGRADING);
+      String buffer = jsonSerDeser.toJson(service);
+      LOG.info("Cancel upgrade in progress. Please wait..");
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to cancel upgrade: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index cd6f0d7..c4e3317 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -58,6 +58,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
+import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING;
 import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
 
@@ -445,6 +446,12 @@ public class ApiServer {
         return upgradeService(updateServiceData, ugi);
       }
 
+      // If CANCEL_UPGRADING is requested
+      if (updateServiceData.getState() != null &&
+          updateServiceData.getState() == CANCEL_UPGRADING) {
+        return cancelUpgradeService(appName, ugi);
+      }
+
       // If new lifetime value specified then update it
       if (updateServiceData.getLifetime() != null
           && updateServiceData.getLifetime() > 0) {
@@ -460,8 +467,7 @@ public class ApiServer {
       LOG.error(message, e);
       return formatResponse(Status.NOT_FOUND, e.getMessage());
     } catch (YarnException e) {
-      String message = "Service is not found in hdfs: " + appName;
-      LOG.error(message, e);
+      LOG.error(e.getMessage(), e);
       return formatResponse(Status.NOT_FOUND, e.getMessage());
     } catch (Exception e) {
       String message = "Error while performing operation for app: " + appName;
@@ -707,6 +713,27 @@ public class ApiServer {
     return formatResponse(Status.ACCEPTED, status);
   }
 
+  private Response cancelUpgradeService(String serviceName,
+      final UserGroupInformation ugi) throws IOException, InterruptedException {
+    int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      int exitCode = sc.actionCancelUpgrade(serviceName);
+      sc.close();
+      return exitCode;
+    });
+    if (result == EXIT_SUCCESS) {
+      ServiceStatus status = new ServiceStatus();
+      LOG.info("Service {} cancelling upgrade", serviceName);
+      status.setDiagnostics("Service " + serviceName +
+          " cancelling upgrade.");
+      status.setState(ServiceState.ACCEPTED);
+      return formatResponse(Status.ACCEPTED, status);
+    }
+    return Response.status(Status.BAD_REQUEST).build();
+  }
+
   private Response processComponentsUpgrade(UserGroupInformation ugi,
       String serviceName, Set<String> compNames) throws YarnException,
       IOException, InterruptedException {
@@ -734,7 +761,8 @@ public class ApiServer {
       Service service, List<Container> containers) throws YarnException,
       IOException, InterruptedException {
 
-    if (service.getState() != ServiceState.UPGRADING) {
+    if (!service.getState().equals(ServiceState.UPGRADING) &&
+        !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
       throw new YarnException(
           String.format("The upgrade of service %s has not been initiated.",
               service.getName()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
index 652a314..39e7dfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -60,4 +62,7 @@ public interface ClientAMProtocol {
 
   GetCompInstancesResponseProto getCompInstances(
       GetCompInstancesRequestProto request) throws IOException, YarnException;
+
+  CancelUpgradeResponseProto cancelUpgrade(
+      CancelUpgradeRequestProto request) throws IOException, YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 2ef8f7e..47e9829 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
@@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService
         ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
             new Container[containers.size()]))).build();
   }
+
+  @Override
+  public CancelUpgradeResponseProto cancelUpgrade(
+      CancelUpgradeRequestProto request) throws IOException, YarnException {
+    LOG.info("Cancel service upgrade by {}",
+        UserGroupInformation.getCurrentUser());
+    ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE);
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+    return CancelUpgradeResponseProto.newBuilder().build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 3a55472..cf4455d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.service.api.records.Component;
 
-import java.util.Queue;
+import java.util.List;
 
 /**
  * Events are handled by {@link ServiceManager} to manage the service
@@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
   private String version;
   private boolean autoFinalize;
   private boolean expressUpgrade;
-  private Queue<Component> compsToUpgradeInOrder;
+  // For express upgrade they should be in order.
+  private List<Component> compsToUpgrade;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     return this;
   }
 
-  public Queue<Component> getCompsToUpgradeInOrder() {
-    return compsToUpgradeInOrder;
+  public List<Component> getCompsToUpgrade() {
+    return compsToUpgrade;
   }
 
-  public ServiceEvent setCompsToUpgradeInOrder(
-      Queue<Component> compsToUpgradeInOrder) {
-    this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+  public ServiceEvent setCompsToUpgrade(List<Component> compsToUpgrade) {
+    this.compsToUpgrade = compsToUpgrade;
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
index 4fc420b..03afdd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
@@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service;
 public enum ServiceEventType {
   START,
   UPGRADE,
-  CHECK_STABLE
+  CHECK_STABLE,
+  CANCEL_UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index 04454b1..4851325 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
-import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
-import org.apache.hadoop.yarn.state.MultipleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
-import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   private final SliderFileSystem fs;
 
   private String upgradeVersion;
-  private Queue<org.apache.hadoop.yarn.service.api.records
-        .Component> compsToUpgradeInOrder;
+  private List<org.apache.hadoop.yarn.service.api.records
+      .Component> componentsToUpgrade;
+  private List<String> compsAffectedByUpgrade = new ArrayList<>();
+  private String cancelledVersion;
 
   private static final StateMachineFactory<ServiceManager, State,
       ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
 
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.START,
-              new CheckStableTransition())
+              new StartFromUpgradeTransition())
 
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.CHECK_STABLE,
               new CheckStableTransition())
+
+          .addTransition(State.UPGRADING, State.UPGRADING,
+              ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition())
           .installTopology();
 
   public ServiceManager(ServiceContext context) {
@@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
       serviceManager.upgradeVersion = event.getVersion();
+      serviceManager.componentsToUpgrade = event.getCompsToUpgrade();
+      event.getCompsToUpgrade().forEach(comp ->
+          serviceManager.compsAffectedByUpgrade.add(comp.getName()));
       try {
         if (event.isExpressUpgrade()) {
-          serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
-          serviceManager.compsToUpgradeInOrder = event
-              .getCompsToUpgradeInOrder();
-          serviceManager.upgradeNextCompIfAny();
+          serviceManager.dispatchNeedUpgradeEvents(false);
+          serviceManager.upgradeNextCompIfAny(false);
+        } else {
+          serviceManager.dispatchNeedUpgradeEvents(false);
+        }
+
+        if (event.isExpressUpgrade()) {
+          serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING);
         } else if (event.isAutoFinalize()) {
-          serviceManager.serviceSpec.setState(ServiceState
-              .UPGRADING_AUTO_FINALIZE);
+          serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE);
         } else {
-          serviceManager.serviceSpec.setState(
-              ServiceState.UPGRADING);
+          serviceManager.setServiceState(ServiceState.UPGRADING);
         }
+
         return State.UPGRADING;
       } catch (Throwable e) {
         LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       if (currState.equals(ServiceState.STABLE)) {
         return State.STABLE;
       }
-      if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
-        org.apache.hadoop.yarn.service.api.records.Component component =
-            serviceManager.compsToUpgradeInOrder.peek();
-        if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
-            !component.getState().equals(ComponentState.UPGRADING)) {
-          serviceManager.compsToUpgradeInOrder.remove();
+      if (currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+          currState.equals(ServiceState.CANCEL_UPGRADING)) {
+
+        if (!serviceManager.componentsToUpgrade.isEmpty()) {
+          org.apache.hadoop.yarn.service.api.records.Component compSpec =
+              serviceManager.componentsToUpgrade.get(0);
+          Component component = serviceManager.scheduler.getAllComponents()
+              .get(compSpec.getName());
+
+          if (!component.isUpgrading()) {
+            serviceManager.componentsToUpgrade.remove(0);
+            serviceManager.upgradeNextCompIfAny(
+                currState.equals(ServiceState.CANCEL_UPGRADING));
+          }
         }
-        serviceManager.upgradeNextCompIfAny();
       }
+
       if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
-          event.getType().equals(ServiceEventType.START) ||
-          (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
-              serviceManager.compsToUpgradeInOrder.isEmpty())) {
+          ((currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+              currState.equals(ServiceState.CANCEL_UPGRADING)) &&
+              serviceManager.componentsToUpgrade.isEmpty())) {
+
         ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
         if (targetState.equals(ServiceState.STABLE)) {
-          if (serviceManager.finalizeUpgrade()) {
-            LOG.info("Service def state changed from {} -> {}", currState,
-                serviceManager.serviceSpec.getState());
+          if (serviceManager.finalizeUpgrade(
+              currState.equals(ServiceState.CANCEL_UPGRADING))) {
             return State.STABLE;
           }
         }
@@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
-  private void upgradeNextCompIfAny() {
-    if (!compsToUpgradeInOrder.isEmpty()) {
+  private static class StartFromUpgradeTransition implements
+      MultipleArcTransition<ServiceManager, ServiceEvent, State> {
+
+    @Override
+    public State transition(ServiceManager serviceManager, ServiceEvent event) {
+      ServiceState currState = serviceManager.serviceSpec.getState();
+
+      ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
+      if (targetState.equals(ServiceState.STABLE)) {
+        if (serviceManager.finalizeUpgrade(
+            currState.equals(ServiceState.CANCEL_UPGRADING))) {
+          return State.STABLE;
+        }
+      }
+      return State.UPGRADING;
+    }
+  }
+
+  private static class CancelUpgradeTransition implements
+      SingleArcTransition<ServiceManager, ServiceEvent> {
+
+    @Override
+    public void transition(ServiceManager serviceManager,
+        ServiceEvent event) {
+      if (!serviceManager.getState().equals(State.UPGRADING)) {
+        LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state",
+            serviceManager.getState());
+        return;
+      }
+      try {
+        Service targetSpec = ServiceApiUtil.loadService(
+            serviceManager.context.fs, serviceManager.getName());
+
+        Service sourceSpec = ServiceApiUtil.loadServiceUpgrade(
+            serviceManager.context.fs, serviceManager.getName(),
+            serviceManager.upgradeVersion);
+        serviceManager.cancelledVersion = serviceManager.upgradeVersion;
+        LOG.info("[SERVICE] cancel version {}",
+            serviceManager.cancelledVersion);
+        serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion();
+        serviceManager.componentsToUpgrade = serviceManager
+            .resolveCompsToUpgrade(sourceSpec, targetSpec);
+
+        serviceManager.compsAffectedByUpgrade.clear();
+        serviceManager.componentsToUpgrade.forEach(comp ->
+            serviceManager.compsAffectedByUpgrade.add(comp.getName()));
+
+        serviceManager.dispatchNeedUpgradeEvents(true);
+        serviceManager.upgradeNextCompIfAny(true);
+        serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING);
+      } catch (Throwable e) {
+        LOG.error("[SERVICE]: Cancellation of upgrade failed", e);
+      }
+    }
+  }
+
+  private void upgradeNextCompIfAny(boolean cancelUpgrade) {
+    if (!componentsToUpgrade.isEmpty()) {
       org.apache.hadoop.yarn.service.api.records.Component component =
-          compsToUpgradeInOrder.peek();
+          componentsToUpgrade.get(0);
+
+      serviceSpec.getComponent(component.getName()).getContainers().forEach(
+          container -> {
+            ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+                ContainerId.fromString(container.getId()),
+                !cancelUpgrade ? ComponentInstanceEventType.UPGRADE :
+                    ComponentInstanceEventType.CANCEL_UPGRADE);
+            LOG.info("Upgrade container {} {}", container.getId(),
+                cancelUpgrade);
+            dispatcher.getEventHandler().handle(upgradeEvent);
+          });
+    }
+  }
 
-      ComponentEvent needUpgradeEvent = new ComponentEvent(
-          component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
-          component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
-      context.scheduler.getDispatcher().getEventHandler().handle(
-          needUpgradeEvent);
+  private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) {
+    if (componentsToUpgrade != null) {
+      componentsToUpgrade.forEach(component -> {
+        ComponentEvent needUpgradeEvent = new ComponentEvent(
+            component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE :
+            ComponentEventType.CANCEL_UPGRADE)
+            .setTargetSpec(component)
+            .setUpgradeVersion(upgradeVersion);
+        LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade);
+        context.scheduler.getDispatcher().getEventHandler()
+            .handle(needUpgradeEvent);
+      });
     }
   }
 
   /**
    * @return whether finalization of upgrade was successful.
    */
-  private boolean finalizeUpgrade() {
-    try {
-      // save the application id and state to
-      Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
-          fs, getName(), upgradeVersion);
-      targetSpec.setId(serviceSpec.getId());
-      targetSpec.setState(ServiceState.STABLE);
-      Map<String, Component> allComps = scheduler.getAllComponents();
-      targetSpec.getComponents().forEach(compSpec -> {
-        Component comp = allComps.get(compSpec.getName());
-        compSpec.setState(comp.getComponentSpec().getState());
-      });
-      jsonSerDeser.save(fs.getFileSystem(),
-          ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
-      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
-    } catch (IOException e) {
-      LOG.error("Upgrade did not complete because unable to re-write the" +
-          " service definition", e);
-      return false;
+  private boolean finalizeUpgrade(boolean cancelUpgrade) {
+    if (!cancelUpgrade) {
+      try {
+        // save the application id and state to
+        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+            fs, getName(), upgradeVersion);
+        targetSpec.setId(serviceSpec.getId());
+        targetSpec.setState(ServiceState.STABLE);
+        Map<String, Component> allComps = scheduler.getAllComponents();
+        targetSpec.getComponents().forEach(compSpec -> {
+          Component comp = allComps.get(compSpec.getName());
+          compSpec.setState(comp.getComponentSpec().getState());
+        });
+        jsonSerDeser.save(fs.getFileSystem(),
+            ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
+      } catch (IOException e) {
+        LOG.error("Upgrade did not complete because unable to re-write the" +
+            " service definition", e);
+        return false;
+      }
     }
 
     try {
-      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
+      String upgradeVersionToDel = cancelUpgrade? cancelledVersion :
+          upgradeVersion;
+      LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel);
+      fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel);
+
+      for (String comp : compsAffectedByUpgrade) {
+        String compDirVersionToDel = cancelUpgrade? cancelledVersion :
+            serviceSpec.getVersion();
+        LOG.info("[SERVICE]: delete {} dir version {}",  comp,
+            compDirVersionToDel);
+        fs.deleteComponentDir(compDirVersionToDel, comp);
+      }
+
+      if (cancelUpgrade) {
+        fs.deleteComponentsVersionDirIfEmpty(cancelledVersion);
+      } else {
+        fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion());
+      }
+
     } catch (IOException e) {
       LOG.warn("Unable to delete upgrade definition for service {} " +
           "version {}", getName(), upgradeVersion);
     }
-    serviceSpec.setState(ServiceState.STABLE);
+    setServiceState(ServiceState.STABLE);
     serviceSpec.setVersion(upgradeVersion);
     upgradeVersion = null;
+    cancelledVersion = null;
+    compsAffectedByUpgrade.clear();
     return true;
   }
 
@@ -291,30 +406,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
         context.fs, context.service.getName(), upgradeVersion);
 
     List<org.apache.hadoop.yarn.service.api.records.Component>
-        compsNeedUpgradeList = componentsFinder.
-        findTargetComponentSpecs(context.service, targetSpec);
-
-    // remove all components from need upgrade list if there restart policy
-    // doesn't all upgrade.
-    if (compsNeedUpgradeList != null) {
-      compsNeedUpgradeList.removeIf(component -> {
-        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
-            restartPolicy = component.getRestartPolicy();
-
-        final ComponentRestartPolicy restartPolicyHandler =
-            Component.getRestartPolicyHandler(restartPolicy);
-        // Do not allow upgrades for components which have NEVER/ON_FAILURE
-        // restart policy
-        if (!restartPolicyHandler.allowUpgrades()) {
-          LOG.info("The component {} has a restart policy that doesnt " +
-                  "allow upgrades {} ", component.getName(),
-              component.getRestartPolicy().toString());
-          return true;
-        }
-
-        return false;
-      });
-    }
+        compsNeedUpgradeList = resolveCompsToUpgrade(context.service,
+        targetSpec);
 
     ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
         .setVersion(upgradeVersion)
@@ -334,7 +427,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       List<String> resolvedComps = ServiceApiUtil
           .resolveCompsDependency(targetSpec);
 
-      Queue<org.apache.hadoop.yarn.service.api.records.Component>
+      List<org.apache.hadoop.yarn.service.api.records.Component>
           orderedCompUpgrade = new LinkedList<>();
       resolvedComps.forEach(compName -> {
         org.apache.hadoop.yarn.service.api.records.Component component =
@@ -343,30 +436,68 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
           orderedCompUpgrade.add(component);
         }
       });
-      event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+      event.setCompsToUpgrade(orderedCompUpgrade);
+    } else {
+      event.setCompsToUpgrade(compsNeedUpgradeList);
     }
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        event);
 
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
-
-    if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
-      if (!expressUpgrade) {
-        compsNeedUpgradeList.forEach(component -> {
-          ComponentEvent needUpgradeEvent = new ComponentEvent(
-              component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
-              component).setUpgradeVersion(event.getVersion());
-          context.scheduler.getDispatcher().getEventHandler().handle(
-              needUpgradeEvent);
-
-        });
-      }
-    }  else if (autoFinalize) {
-      // nothing to upgrade if upgrade auto finalize is requested, trigger a
+    if (autoFinalize && (compsNeedUpgradeList == null ||
+        compsNeedUpgradeList.isEmpty())) {
+      // nothing to upgrade and auto finalize is requested, trigger a
       // state check.
       context.scheduler.getDispatcher().getEventHandler().handle(
           new ServiceEvent(ServiceEventType.CHECK_STABLE));
     }
   }
 
+  private List<org.apache.hadoop.yarn.service.api.records.Component>
+      resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) {
+
+    List<org.apache.hadoop.yarn.service.api.records.Component>
+        compsNeedUpgradeList = componentsFinder.
+        findTargetComponentSpecs(sourceSpec, targetSpec);
+
+    // remove all components from need upgrade list if there restart policy
+    // doesn't all upgrade.
+    if (compsNeedUpgradeList != null) {
+      compsNeedUpgradeList.removeIf(component -> {
+        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
+            restartPolicy = component.getRestartPolicy();
+
+        final ComponentRestartPolicy restartPolicyHandler =
+            Component.getRestartPolicyHandler(restartPolicy);
+        // Do not allow upgrades for components which have NEVER/ON_FAILURE
+        // restart policy
+        if (!restartPolicyHandler.allowUpgrades()) {
+          LOG.info("The component {} has a restart policy that doesnt " +
+                  "allow upgrades {} ", component.getName(),
+              component.getRestartPolicy().toString());
+          return true;
+        }
+
+        return false;
+      });
+    }
+
+    return compsNeedUpgradeList;
+  }
+
+  /**
+   * Sets the state of the service in the service spec.
+   * @param state service state
+   */
+  private void setServiceState(
+      org.apache.hadoop.yarn.service.api.records.ServiceState state) {
+    org.apache.hadoop.yarn.service.api.records.ServiceState curState =
+        serviceSpec.getState();
+    if (!curState.equals(state)) {
+      serviceSpec.setState(state);
+      LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state);
+    }
+  }
+
   /**
    * Returns the name of the service.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
index cac527a..a6e9a2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
@@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum ContainerState {
   RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
-  FAILED;
+  FAILED, FAILED_UPGRADE;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 49c1985..3f2f4f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 @ApiModel(description = "The current state of an service.")
 public enum ServiceState {
   ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
-  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
+  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING;
+
+  public static boolean isUpgrading(ServiceState state) {
+    return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE)
+        || state.equals(EXPRESS_UPGRADING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index a27ed87..23db57e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -353,6 +354,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   }
 
   @Override
+  public int actionCancelUpgrade(String appName) throws IOException,
+      YarnException {
+    Service liveService = getStatus(appName);
+    if (liveService == null ||
+        !ServiceState.isUpgrading(liveService.getState())) {
+      throw new YarnException("Service " + appName + " is not upgrading, " +
+          "so nothing to cancel.");
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(
+        getAppId(appName));
+    if (StringUtils.isEmpty(appReport.getHost())) {
+      throw new YarnException(appName + " AM hostname is empty");
+    }
+    ClientAMProtocol proxy = createAMProxy(appName, appReport);
+    proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build());
+    return EXIT_SUCCESS;
+  }
+
+  @Override
   public int actionCleanUp(String appName, String userName) throws
       IOException, YarnException {
     if (cleanUpRegistry(appName, userName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index acf3404..526bde0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import static org.apache.hadoop.yarn.service.api.records.Component
@@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
@@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
 import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE;
 import static org.apache.hadoop.yarn.service.component.ComponentState.*;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
 import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
@@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> {
       new ConcurrentHashMap<>();
   private boolean healthThresholdMonitorEnabled = false;
 
-  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
-  private ComponentEvent upgradeEvent;
-  private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
+  private UpgradeStatus upgradeStatus = new UpgradeStatus();
+  private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus();
 
   private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
       stateMachine;
@@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> {
           // Flex while previous flex is still in progress
           .addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
               new FlexComponentTransition())
+          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
 
           // container failed while stable
           .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
@@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> {
           // For flex down, go to STABLE state
           .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
-          .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          //Upgrade while previous upgrade is still in progress
-          .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
-              CHECK_STABLE, new CheckStableTransition())
-          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
-              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(STABLE, UPGRADING, UPGRADE,
+              new NeedsUpgradeTransition())
+          .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
+              new NeedsUpgradeTransition())
           .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
               new CheckStableTransition())
-          .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
-              new ContainerCompletedTransition())
+
+          // Cancel upgrade while previous upgrade is still in progress
+          .addTransition(UPGRADING, CANCEL_UPGRADING,
+              CANCEL_UPGRADE, new NeedsUpgradeTransition())
+          .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
+              new CompletedAfterUpgradeTransition())
+
+          .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
+              STABLE), CHECK_STABLE, new CheckStableTransition())
+          .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
+              CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
+          .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
+              new ContainerAllocatedTransition())
+
           .installTopology();
 
   public Component(
@@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> {
                 + before + " to " + event.getDesired());
         component.requestContainers(delta);
         component.createNumCompInstances(delta);
-        component.componentSpec.setState(
+        component.setComponentState(
             org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
         component.getScheduler().getApp().setState(ServiceState.STARTED);
         return FLEXING;
@@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> {
     if (component.getNumRunningInstances() + component
         .getNumSucceededInstances() + component.getNumFailedInstances()
         < component.getComponentSpec().getNumberOfContainers()) {
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
     } else{
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
     }
@@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> {
       Component component) {
     // if desired == running
     if (component.componentMetrics.containersReady.value() == component
-        .getComponentSpec().getNumberOfContainers()
-        && component.numContainersThatNeedUpgrade.get() == 0) {
-      component.componentSpec.setState(
+        .getComponentSpec().getNumberOfContainers() &&
+        !component.doesNeedUpgrade()) {
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
+    } else if (component.doesNeedUpgrade()) {
+      component.setComponentState(org.apache.hadoop.yarn.service.api.records.
+          ComponentState.NEEDS_UPGRADE);
+      return component.getState();
     } else if (component.componentMetrics.containersReady.value() != component
         .getComponentSpec().getNumberOfContainers()) {
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
-    } else {
-      //  component.numContainersThatNeedUpgrade.get() > 0
-      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
-          records.ComponentState.NEEDS_UPGRADE);
-      return UPGRADING;
     }
+    return component.getState();
   }
 
   // This method should be called whenever there is an increment or decrement
@@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> {
   //This should not matter for terminating components
   private static synchronized void checkAndUpdateComponentState(
       Component component, boolean isIncrement) {
-    org.apache.hadoop.yarn.service.api.records.ComponentState curState =
-        component.componentSpec.getState();
 
     if (component.getRestartPolicyHandler().isLongLived()) {
       if (isIncrement) {
         // check if all containers are in READY state
-        if (component.numContainersThatNeedUpgrade.get() == 0
-            && component.componentMetrics.containersReady.value()
-            == component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+        if (!component.upgradeStatus.areContainersUpgrading() &&
+            !component.cancelUpgradeStatus.areContainersUpgrading() &&
+            component.componentMetrics.containersReady.value() ==
+                component.componentMetrics.containersDesired.value()) {
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
-          if (curState != component.componentSpec.getState()) {
-            LOG.info("[COMPONENT {}] state changed from {} -> {}",
-                component.componentSpec.getName(), curState,
-                component.componentSpec.getState());
-          }
           // component state change will trigger re-check of service state
           component.context.getServiceManager().checkAndUpdateServiceState();
         }
@@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> {
         // still need to verify the count before changing the component state
         if (component.componentMetrics.containersReady.value()
             < component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState
                   .FLEXING);
         } else if (component.componentMetrics.containersReady.value()
             == component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
         }
-        if (curState != component.componentSpec.getState()) {
-          LOG.info("[COMPONENT {}] state changed from {} -> {}",
-              component.componentSpec.getName(), curState,
-              component.componentSpec.getState());
-        }
         // component state change will trigger re-check of service state
         component.context.getServiceManager().checkAndUpdateServiceState();
       }
@@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> {
       // component state change will trigger re-check of service state
       component.context.getServiceManager().checkAndUpdateServiceState();
     }
-    // when the service is stable then the state of component needs to
-    // transition to stable
+    // triggers the state machine in component to reach appropriate state
+    // once the state in spec is changed.
     component.dispatcher.getEventHandler().handle(
         new ComponentEvent(component.getName(),
             ComponentEventType.CHECK_STABLE));
@@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  private static class ComponentNeedsUpgradeTransition extends BaseTransition {
+  private static class CompletedAfterUpgradeTransition extends BaseTransition {
     @Override
     public void transition(Component component, ComponentEvent event) {
-      component.upgradeInProgress.set(true);
-      component.upgradeEvent = event;
-      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
-          records.ComponentState.NEEDS_UPGRADE);
-      component.numContainersThatNeedUpgrade.set(
+      Preconditions.checkNotNull(event.getContainerId());
+      component.updateMetrics(event.getStatus());
+      component.dispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(event.getContainerId(), STOP)
+              .setStatus(event.getStatus()));
+    }
+  }
+
+  private static class NeedsUpgradeTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      boolean isCancel = event.getType().equals(CANCEL_UPGRADE);
+      UpgradeStatus status = !isCancel ? component.upgradeStatus :
+          component.cancelUpgradeStatus;
+
+      status.inProgress.set(true);
+      status.targetSpec = event.getTargetSpec();
+      status.targetVersion = event.getUpgradeVersion();
+      LOG.info("[COMPONENT {}]: need upgrade to {}",
+          component.getName(), status.targetVersion);
+
+      status.containersNeedUpgrade.set(
           component.componentSpec.getNumberOfContainers());
-      component.componentSpec.getContainers().forEach(container -> {
-        container.setState(ContainerState.NEEDS_UPGRADE);
-        if (event.isExpressUpgrade()) {
-          ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
-              ContainerId.fromString(container.getId()),
-                  ComponentInstanceEventType.UPGRADE);
-          LOG.info("Upgrade container {}", container.getId());
-          component.dispatcher.getEventHandler().handle(upgradeEvent);
-        }
+
+      component.setComponentState(org.apache.hadoop.yarn.service.api.
+          records.ComponentState.NEEDS_UPGRADE);
+
+      component.getAllComponentInstances().forEach(instance -> {
+        instance.setContainerState(ContainerState.NEEDS_UPGRADE);
       });
+
+      if (event.getType().equals(CANCEL_UPGRADE)) {
+        component.upgradeStatus.reset();
+      }
     }
   }
 
@@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> {
     @Override
     public ComponentState transition(Component component,
         ComponentEvent componentEvent) {
-      org.apache.hadoop.yarn.service.api.records.ComponentState currState =
-          component.componentSpec.getState();
-      if (currState.equals(org.apache.hadoop.yarn.service.api.records
-          .ComponentState.STABLE)) {
-        return ComponentState.STABLE;
-      }
       // checkIfStable also updates the state in definition when STABLE
       ComponentState targetState = checkIfStable(component);
-      if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
-        component.componentSpec.overwrite(
-            component.upgradeEvent.getTargetSpec());
-        component.upgradeEvent = null;
+
+      if (targetState.equals(STABLE) &&
+          !(component.upgradeStatus.isCompleted() &&
+              component.cancelUpgradeStatus.isCompleted())) {
+        // Component stable after upgrade or cancel upgrade
+        UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ?
+            component.cancelUpgradeStatus : component.upgradeStatus;
+
+        component.componentSpec.overwrite(status.getTargetSpec());
+        status.reset();
+
         ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
             CHECK_STABLE);
         component.dispatcher.getEventHandler().handle(checkStable);
-        component.upgradeInProgress.set(false);
       }
       return targetState;
     }
@@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> {
         "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
         getName(), container.getId(), instance.getCompInstanceName(),
         container.getNodeId());
-    if (upgradeInProgress.get()) {
+    if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
+      UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
+          cancelUpgradeStatus : upgradeStatus;
+
       scheduler.getContainerLaunchService()
           .launchCompInstance(scheduler.getApp(), instance, container,
-              createLaunchContext(upgradeEvent.getTargetSpec(),
-                  upgradeEvent.getUpgradeVersion()));
+              createLaunchContext(status.getTargetSpec(),
+                  status.getTargetVersion()));
     } else {
       scheduler.getContainerLaunchService().launchCompInstance(
           scheduler.getApp(), instance, container,
@@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
+  private boolean doesNeedUpgrade() {
+    return cancelUpgradeStatus.areContainersUpgrading() ||
+        upgradeStatus.areContainersUpgrading() ||
+        upgradeStatus.failed.get();
+  }
+
   public boolean areDependenciesReady() {
     List<String> dependencies = componentSpec.getDependencies();
     if (ServiceUtils.isEmpty(dependencies)) {
@@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  public void decContainersThatNeedUpgrade() {
-    numContainersThatNeedUpgrade.decrementAndGet();
-  }
-
   public int getNumReadyInstances() {
     return componentMetrics.containersReady.value();
   }
@@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  public ComponentEvent getUpgradeEvent() {
+  /**
+   * Returns whether a component is upgrading or not.
+   */
+  public boolean isUpgrading() {
+    this.readLock.lock();
+
+    try {
+      return !(upgradeStatus.isCompleted() &&
+          cancelUpgradeStatus.isCompleted());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public UpgradeStatus getUpgradeStatus() {
+    this.readLock.lock();
+    try {
+      return upgradeStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public UpgradeStatus getCancelUpgradeStatus() {
     this.readLock.lock();
     try {
-      return upgradeEvent;
+      return cancelUpgradeStatus;
     } finally {
       this.readLock.unlock();
     }
@@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
+  /**
+   * Sets the state of the component in the component spec.
+   * @param state component state
+   */
+  private void setComponentState(
+      org.apache.hadoop.yarn.service.api.records.ComponentState state) {
+    org.apache.hadoop.yarn.service.api.records.ComponentState curState =
+        componentSpec.getState();
+    if (!curState.equals(state)) {
+      componentSpec.setState(state);
+      LOG.info("[COMPONENT {}] spec state changed from {} -> {}",
+          componentSpec.getName(), curState, state);
+    }
+  }
+
+  /**
+   * Status of upgrade.
+   */
+  public static class UpgradeStatus {
+    private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
+    private String targetVersion;
+    private AtomicBoolean inProgress = new AtomicBoolean(false);
+    private AtomicLong containersNeedUpgrade = new AtomicLong(0);
+    private AtomicBoolean failed = new AtomicBoolean(false);
+
+    public org.apache.hadoop.yarn.service.api.records.
+        Component getTargetSpec() {
+      return targetSpec;
+    }
+
+    public String getTargetVersion() {
+      return targetVersion;
+    }
+
+    /*
+     * @return whether the upgrade is completed or not
+     */
+    public boolean isCompleted() {
+      return !inProgress.get();
+    }
+
+    public void decContainersThatNeedUpgrade() {
+      if (inProgress.get()) {
+        containersNeedUpgrade.decrementAndGet();
+      }
+    }
+
+    public void containerFailedUpgrade() {
+      failed.set(true);
+    }
+
+    void reset() {
+      containersNeedUpgrade.set(0);
+      targetSpec = null;
+      targetVersion = null;
+      inProgress.set(false);
+      failed.set(false);
+    }
+
+    boolean areContainersUpgrading() {
+      return containersNeedUpgrade.get() != 0;
+    }
+  }
+
   public ServiceContext getContext() {
     return context;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 643961d..84caa77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
   private String upgradeVersion;
-  private boolean expressUpgrade;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.upgradeVersion = upgradeVersion;
     return this;
   }
-
-  public boolean isExpressUpgrade() {
-    return expressUpgrade;
-  }
-
-  public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
-    this.expressUpgrade = expressUpgrade;
-    return this;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 44d781f..d211f49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -24,6 +24,7 @@ public enum ComponentEventType {
   CONTAINER_RECOVERED,
   CONTAINER_STARTED,
   CONTAINER_COMPLETED,
+  CANCEL_UPGRADE,
   UPGRADE,
   CHECK_STABLE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
index 0f63d03..e1cd0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -22,5 +22,6 @@ public enum ComponentState {
   INIT,
   FLEXING,
   STABLE,
-  UPGRADING
+  UPGRADING,
+  CANCEL_UPGRADING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index afd8c67..89c9a22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       LoggerFactory.getLogger(ComponentInstance.class);
   private static final String FAILED_BEFORE_LAUNCH_DIAG =
       "failed before launch";
+  private static final String UPGRADE_FAILED = "upgrade failed";
 
-  private  StateMachine<ComponentInstanceState, ComponentInstanceEventType,
+  private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
       ComponentInstanceEvent> stateMachine;
   private Component component;
   private final ReadLock readLock;
@@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
   // This container object is used for rest API query
   private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
   private String serviceVersion;
-
+  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
+  private boolean pendingCancelUpgrade = false;
 
   private static final StateMachineFactory<ComponentInstance,
       ComponentInstanceState, ComponentInstanceEventType,
@@ -132,13 +137,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       .addTransition(READY, STARTED, BECOME_NOT_READY,
           new ContainerBecomeNotReadyTransition())
       .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
-      .addTransition(READY, UPGRADING, UPGRADE,
-          new ContainerUpgradeTransition())
-      .addTransition(UPGRADING, UPGRADING, UPGRADE,
-          new ContainerUpgradeTransition())
-      .addTransition(UPGRADING, READY, BECOME_READY,
-          new ContainerBecomeReadyTransition())
-      .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
+      .addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition())
+      .addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE,
+          new CancelUpgradeTransition())
+
+      // FROM UPGRADING
+      .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
+          CANCEL_UPGRADE, new CancelUpgradeTransition())
+      .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
+          new ReadyAfterUpgradeTransition())
+      .addTransition(UPGRADING, UPGRADING, STOP,
+          new StoppedAfterUpgradeTransition())
+
+      // FROM CANCEL_UPGRADING
+      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
+          BECOME_READY, new ReadyAfterUpgradeTransition())
+      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
+          STOP, new StoppedAfterCancelUpgradeTransition())
       .installTopology();
 
   public ComponentInstance(Component component,
@@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
-      compInstance.containerSpec.setState(ContainerState.READY);
-      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
-        compInstance.component.incContainersReady(false);
-        compInstance.component.decContainersThatNeedUpgrade();
-        compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
-            .getUpgradeVersion();
-        ComponentEvent checkState = new ComponentEvent(
-            compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
-        compInstance.scheduler.getDispatcher().getEventHandler().handle(
-            checkState);
+      compInstance.setContainerState(ContainerState.READY);
+      compInstance.component.incContainersReady(true);
+      compInstance.postContainerReady();
+    }
+  }
 
-      } else {
-        compInstance.component.incContainersReady(true);
-      }
-      if (compInstance.timelineServiceEnabled) {
-        compInstance.serviceTimelinePublisher
-            .componentInstanceBecomeReady(compInstance.containerSpec);
+  private static class ReadyAfterUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+
+      if (instance.pendingCancelUpgrade) {
+        // cancellation of upgrade was triggered before the upgrade was
+        // finished.
+        LOG.info("{} received ready but cancellation pending",
+            event.getContainerId());
+        instance.upgradeInProgress.set(true);
+        instance.cancelUpgrade();
+        instance.pendingCancelUpgrade = false;
+        return instance.getState();
       }
+
+      instance.upgradeInProgress.set(false);
+      instance.setContainerState(ContainerState.READY);
+      instance.component.incContainersReady(false);
+
+      Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
+          instance.component.getUpgradeStatus() :
+          instance.component.getCancelUpgradeStatus();
+      status.decContainersThatNeedUpgrade();
+
+      instance.serviceVersion = status.getTargetVersion();
+      ComponentEvent checkState = new ComponentEvent(
+          instance.component.getName(),
+          ComponentEventType.CHECK_STABLE);
+      instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
+      instance.postContainerReady();
+      return ComponentInstanceState.READY;
+    }
+  }
+
+  private void postContainerReady() {
+    if (timelineServiceEnabled) {
+      serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
     }
   }
 
@@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
-      compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
+      compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
       compInstance.component.decContainersReady(true);
     }
   }
@@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
           " completed. Reinsert back to pending list and requested ");
       builder.append("a new container.").append(System.lineSeparator());
       builder.append(" exitStatus=").append(
-          failureBeforeLaunch ? null : event.getStatus().getExitStatus());
+          failureBeforeLaunch || event.getStatus() == null ? null :
+              event.getStatus().getExitStatus());
       builder.append(", diagnostics=");
       builder.append(failureBeforeLaunch ?
           FAILED_BEFORE_LAUNCH_DIAG :
-          event.getStatus().getDiagnostics());
+          (event.getStatus() != null ? event.getStatus().getDiagnostics() :
+              UPGRADE_FAILED));
 
       if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
         LOG.error(builder.toString());
@@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         ComponentInstanceEvent event) {
 
       Component comp = compInstance.component;
+      ContainerStatus status = event.getStatus();
+      // status is not available when upgrade fails
       String containerDiag = compInstance.getCompInstanceId() + ": " + (
-          failedBeforeLaunching ?
-              FAILED_BEFORE_LAUNCH_DIAG :
-              event.getStatus().getDiagnostics());
+          failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG :
+              (status != null ? status.getDiagnostics() : UPGRADE_FAILED));
       compInstance.diagnostics.append(containerDiag + System.lineSeparator());
       compInstance.cancelContainerStatusRetriever();
-      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
-        compInstance.component.decContainersThatNeedUpgrade();
-      }
+
       if (compInstance.getState().equals(READY)) {
         compInstance.component.decContainersReady(true);
       }
@@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
           // record in ATS
           compInstance.scheduler.getServiceTimelinePublisher()
               .componentInstanceFinished(compInstance.getContainer().getId(),
-                  failedBeforeLaunching ?
-                      -1 :
-                      event.getStatus().getExitStatus(), ContainerState.FAILED,
-                  containerDiag);
+                  failedBeforeLaunching || status == null ? -1 :
+                      status.getExitStatus(),
+                  ContainerState.FAILED, containerDiag);
 
           // mark other component-instances/containers as STOPPED
           for (ContainerId containerId : scheduler.getLiveInstances()
@@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         .equals(state) || ContainerState.SUCCEEDED.equals(state);
   }
 
-  private static class ContainerUpgradeTransition extends BaseTransition {
+  private static class StoppedAfterUpgradeTransition extends
+      BaseTransition {
 
     @Override
-    public void transition(ComponentInstance compInstance,
+    public void transition(ComponentInstance instance,
         ComponentInstanceEvent event) {
-      if (!compInstance.containerSpec.getState().equals(
-          ContainerState.NEEDS_UPGRADE)) {
-        //nothing to upgrade. this may happen with express upgrade.
+      instance.component.getUpgradeStatus().decContainersThatNeedUpgrade();
+      instance.component.decRunningContainers();
+
+      final ServiceScheduler scheduler = instance.component.getScheduler();
+      scheduler.getAmRMClient().releaseAssignedContainer(
+          event.getContainerId());
+      instance.scheduler.executorService.submit(
+          () -> instance.cleanupRegistry(event.getContainerId()));
+      scheduler.removeLiveCompInstance(event.getContainerId());
+      instance.component.getUpgradeStatus().containerFailedUpgrade();
+      instance.setContainerState(ContainerState.FAILED_UPGRADE);
+      instance.upgradeInProgress.set(false);
+    }
+  }
+
+  private static class StoppedAfterCancelUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    private ContainerStoppedTransition stoppedTransition =
+        new ContainerStoppedTransition();
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (instance.pendingCancelUpgrade) {
+        // cancellation of upgrade was triggered before the upgrade was
+        // finished.
+        LOG.info("{} received stopped but cancellation pending",
+            event.getContainerId());
+        instance.upgradeInProgress.set(true);
+        instance.cancelUpgrade();
+        instance.pendingCancelUpgrade = false;
+        return instance.getState();
+      }
+
+      // When upgrade is cancelled, and container re-init fails
+      instance.component.getCancelUpgradeStatus()
+          .decContainersThatNeedUpgrade();
+      instance.upgradeInProgress.set(false);
+      stoppedTransition.transition(instance, event);
+      return ComponentInstanceState.INIT;
+    }
+  }
+
+  private static class UpgradeTransition extends BaseTransition {
+
+    @Override
+    public void transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (!instance.component.getCancelUpgradeStatus().isCompleted()) {
+        // last check to see if cancellation was triggered. The component may
+        // have processed the cancel upgrade event but the instance doesn't know
+        // it yet. If cancellation has been triggered then no point in
+        // upgrading.
         return;
       }
-      compInstance.containerSpec.setState(ContainerState.UPGRADING);
-      compInstance.component.decContainersReady(false);
-      ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
-      compInstance.scheduler.getContainerLaunchService()
-          .reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
-              compInstance.container,
-              compInstance.component.createLaunchContext(
-                  upgradeEvent.getTargetSpec(),
-                  upgradeEvent.getUpgradeVersion()));
+      instance.upgradeInProgress.set(true);
+      instance.setContainerState(ContainerState.UPGRADING);
+      instance.component.decContainersReady(false);
+
+      Component.UpgradeStatus status = instance.component.getUpgradeStatus();
+      instance.scheduler.getContainerLaunchService()
+          .reInitCompInstance(instance.scheduler.getApp(), instance,
+              instance.container,
+              instance.component.createLaunchContext(
+                  status.getTargetSpec(),
+                  status.getTargetVersion()));
     }
   }
 
+  private static class CancelUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (instance.upgradeInProgress.compareAndSet(false, true)) {
+
+        Component.UpgradeStatus cancelStatus = instance.component
+            .getCancelUpgradeStatus();
+
+        if (instance.getServiceVersion().equals(
+            cancelStatus.getTargetVersion())) {
+          // previous upgrade didn't happen so just go back to READY
+          LOG.info("{} nothing to cancel", event.getContainerId());
+          cancelStatus.decContainersThatNeedUpgrade();
+          instance.setContainerState(ContainerState.READY);
+          ComponentEvent checkState = new ComponentEvent(
+              instance.component.getName(), ComponentEventType.CHECK_STABLE);
+          instance.scheduler.getDispatcher().getEventHandler()
+              .handle(checkState);
+          return ComponentInstanceState.READY;
+        } else {
+          instance.component.decContainersReady(false);
+          instance.cancelUpgrade();
+        }
+      } else {
+        LOG.info("{} pending cancellation", event.getContainerId());
+        instance.pendingCancelUpgrade = true;
+      }
+      return ComponentInstanceState.CANCEL_UPGRADING;
+    }
+  }
+
+  private void cancelUpgrade() {
+    LOG.info("{} cancelling upgrade", container.getId());
+    setContainerState(ContainerState.UPGRADING);
+    Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
+    scheduler.getContainerLaunchService()
+        .reInitCompInstance(scheduler.getApp(), this,
+            this.container, this.component.createLaunchContext(
+                cancelStatus.getTargetSpec(),
+                cancelStatus.getTargetVersion()));
+  }
+
   public ComponentInstanceState getState() {
     this.readLock.lock();
 
@@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  /**
+   * Sets the state of the container in the container spec. It is write
+   * protected.
+   *
+   * @param state container state
+   */
+  public void setContainerState(ContainerState state) {
+    this.writeLock.lock();
+    try {
+      ContainerState curState = containerSpec.getState();
+      if (!curState.equals(state)) {
+        containerSpec.setState(state);
+        LOG.info("{} spec state state changed from {} -> {}",
+            getCompInstanceId(), curState, state);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   @Override
   public void handle(ComponentInstanceEvent event) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
index 665b8fa..b9181e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -23,5 +23,6 @@ public enum ComponentInstanceEventType {
   STOP,
   BECOME_READY,
   BECOME_NOT_READY,
-  UPGRADE
+  UPGRADE,
+  CANCEL_UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
index f5de5cb..28cbcf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
@@ -22,5 +22,6 @@ public enum ComponentInstanceState {
   INIT,
   STARTED,
   READY,
-  UPGRADING
+  UPGRADING,
+  CANCEL_UPGRADING
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org