You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/09/26 18:53:46 UTC

[1/2] hadoop git commit: YARN-8665. Added Yarn service cancel upgrade option. Contributed by Chandni Singh

Repository: hadoop
Updated Branches:
  refs/heads/trunk e0ff8e2c1 -> 913f87dad


http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index 3c856ec..153ab46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
@@ -113,7 +112,8 @@ public class ContainerLaunchService extends AbstractService{
               .startContainerAsync(container,
                   launcher.completeContainerLaunch());
         } else {
-          LOG.info("reInitializing container {}", container.getId());
+          LOG.info("reInitializing container {} with version {}",
+              container.getId(), componentLaunchContext.getServiceVersion());
           instance.getComponent().getScheduler().getNmClient()
               .reInitializeContainerAsync(container.getId(),
                   launcher.completeContainerLaunch(), true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
index 49ecd2e..6f37967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
@@ -30,6 +30,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -141,4 +143,15 @@ public class ClientAMProtocolPBClientImpl
     }
     return null;
   }
+
+  @Override
+  public CancelUpgradeResponseProto cancelUpgrade(
+      CancelUpgradeRequestProto request) throws IOException, YarnException {
+    try {
+      return proxy.cancelUpgrade(null, request);
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
index eab3f9f..071c357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -116,4 +118,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public CancelUpgradeResponseProto cancelUpgrade(
+      RpcController controller, CancelUpgradeRequestProto request)
+      throws ServiceException {
+    try {
+      return real.cancelUpgrade(request);
+    } catch (IOException | YarnException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index ac90992..c12c340 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -167,9 +167,8 @@ public class ProviderUtils implements YarnServiceConstants {
   public static Path initCompInstanceDir(SliderFileSystem fs,
       ContainerLaunchService.ComponentLaunchContext compLaunchContext,
       ComponentInstance instance) {
-    Path compDir = new Path(new Path(fs.getAppDir(), "components"),
-        compLaunchContext.getServiceVersion() + "/" +
-            compLaunchContext.getName());
+    Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(),
+        compLaunchContext.getName());
     Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
     instance.setCompInstanceDir(compInstanceDir);
     return compInstanceDir;
@@ -184,7 +183,9 @@ public class ProviderUtils implements YarnServiceConstants {
       ServiceContext context) throws IOException {
     Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
     if (!fs.getFileSystem().exists(compInstanceDir)) {
-      log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
+      log.info("{} version {} : Creating dir on hdfs: {}",
+          instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
+          compInstanceDir);
       fs.getFileSystem().mkdirs(compInstanceDir,
           new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index b588e88..0eb54ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -603,7 +603,7 @@ public class ServiceApiUtil {
   public static void validateInstancesUpgrade(List<Container>
       liveContainers) throws YarnException {
     for (Container liveContainer : liveContainers) {
-      if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
+      if (!isUpgradable(liveContainer)) {
         // Nothing to upgrade
         throw new YarnException(String.format(
             ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
@@ -613,6 +613,16 @@ public class ServiceApiUtil {
   }
 
   /**
+   * Returns whether the container can be upgraded in the current state.
+   */
+  public static boolean isUpgradable(Container container) {
+
+    return container.getState() != null &&
+        (container.getState().equals(ContainerState.NEEDS_UPGRADE) ||
+            container.getState().equals(ContainerState.FAILED_UPGRADE));
+  }
+
+  /**
    * Validates the components that are requested to upgrade require an upgrade.
    * It returns the instances of the components which need upgrade.
    */
@@ -629,7 +639,7 @@ public class ServiceApiUtil {
               ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
         }
         liveComp.getContainers().forEach(liveContainer -> {
-          if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
+          if (isUpgradable(liveContainer)) {
             containerNeedUpgrade.add(liveContainer);
           }
         });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
index d6d664e..c776476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.utils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -48,4 +50,51 @@ public class SliderFileSystem extends CoreFileSystem {
   public Path getAppDir() {
     return this.appDir;
   }
+
+  /**
+   * Returns the component directory path.
+   *
+   * @param serviceVersion service version
+   * @param compName       component name
+   * @return component directory
+   */
+  public Path getComponentDir(String serviceVersion, String compName) {
+    return new Path(new Path(getAppDir(), "components"),
+        serviceVersion + "/" + compName);
+  }
+
+  /**
+   * Deletes the component directory.
+   *
+   * @param serviceVersion
+   * @param compName
+   * @throws IOException
+   */
+  public void deleteComponentDir(String serviceVersion, String compName)
+      throws IOException {
+    Path path = getComponentDir(serviceVersion, compName);
+    if (fileSystem.exists(path)) {
+      fileSystem.delete(path, true);
+      LOG.debug("deleted dir {}", path);
+    }
+  }
+
+  /**
+   * Deletes the components version directory.
+   *
+   * @param serviceVersion
+   * @throws IOException
+   */
+  public void deleteComponentsVersionDirIfEmpty(String serviceVersion)
+      throws IOException {
+    Path path = new Path(new Path(getAppDir(), "components"), serviceVersion);
+    if (fileSystem.exists(path) && fileSystem.listStatus(path).length == 0) {
+      fileSystem.delete(path, true);
+      LOG.info("deleted dir {}", path);
+    }
+  }
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SliderFileSystem.class);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 169f765..bcf893e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -28,6 +28,8 @@ service ClientAMProtocolService {
   rpc stop(StopRequestProto) returns (StopResponseProto);
   rpc upgradeService(UpgradeServiceRequestProto)
     returns (UpgradeServiceResponseProto);
+  rpc cancelUpgrade(CancelUpgradeRequestProto)
+    returns (CancelUpgradeResponseProto);
   rpc restartService(RestartServiceRequestProto)
     returns (RestartServiceResponseProto);
   rpc upgrade(CompInstancesUpgradeRequestProto) returns
@@ -73,6 +75,12 @@ message UpgradeServiceResponseProto {
   optional string error = 1;
 }
 
+message CancelUpgradeRequestProto {
+}
+
+message CancelUpgradeResponseProto {
+}
+
 message RestartServiceRequestProto {
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
index 321b2cd..b685f4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
@@ -94,21 +94,25 @@ public class MockRunningServiceContext extends ServiceContext {
         return mockLaunchService;
       }
 
-      @Override public ServiceUtils.ProcessTerminationHandler
-      getTerminationHandler() {
+      @Override
+      public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
         return new
-        ServiceUtils.ProcessTerminationHandler() {
-          public void terminate(int exitCode) {
-          }
-        };
+            ServiceUtils.ProcessTerminationHandler() {
+              public void terminate(int exitCode) {
+              }
+            };
+      }
+
+      @Override
+      protected ServiceManager createServiceManager() {
+        return ServiceTestUtils.createServiceManager(
+            MockRunningServiceContext.this);
       }
     };
 
 
     this.scheduler.init(fsWatcher.getConf());
 
-    ServiceTestUtils.createServiceManager(this);
-
     doNothing().when(mockLaunchService).
         reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
     stabilizeComponents(this);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 6b49ab0..58db752 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -384,6 +384,7 @@ public class ServiceTestUtils {
       conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
       try {
         fs = new SliderFileSystem(conf);
+        fs.setAppDir(new Path(serviceBasePath.toString()));
       } catch (IOException e) {
         Throwables.propagate(e);
       }
@@ -532,7 +533,6 @@ public class ServiceTestUtils {
     GenericTestUtils.waitFor(() -> {
       try {
         Service retrievedApp = client.getStatus(exampleApp.getName());
-        System.out.println(retrievedApp);
         return retrievedApp.getState() == desiredState;
       } catch (Exception e) {
         e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index a37cabe..406eea4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@@ -65,7 +66,7 @@ public class TestServiceManager {
     initUpgrade(context, "v2", false, false, false);
     ServiceManager manager = context.getServiceManager();
     //make components stable by upgrading all instances
-    upgradeAllInstances(context);
+    upgradeAndReadyAllInstances(context);
 
     context.scheduler.getDispatcher().getEventHandler().handle(
         new ServiceEvent(ServiceEventType.START));
@@ -83,7 +84,7 @@ public class TestServiceManager {
     initUpgrade(context, "v2", false, true, false);
     ServiceManager manager = context.getServiceManager();
     //make components stable by upgrading all instances
-    upgradeAllInstances(context);
+    upgradeAndReadyAllInstances(context);
 
     GenericTestUtils.waitFor(()->
         context.service.getState().equals(ServiceState.STABLE),
@@ -115,7 +116,7 @@ public class TestServiceManager {
         manager.getServiceSpec().getState());
 
     //make components stable by upgrading all instances
-    upgradeAllInstances(context);
+    upgradeAndReadyAllInstances(context);
 
     // finalize service
     context.scheduler.getDispatcher().getEventHandler().handle(
@@ -138,7 +139,7 @@ public class TestServiceManager {
     initUpgrade(context, "v2", true, true, false);
 
     // make components stable
-    upgradeAllInstances(context);
+    upgradeAndReadyAllInstances(context);
 
     GenericTestUtils.waitFor(() ->
         context.service.getState().equals(ServiceState.STABLE),
@@ -174,18 +175,17 @@ public class TestServiceManager {
   public void testExpressUpgrade() throws Exception {
     ServiceContext context = createServiceContext("testExpressUpgrade");
     ServiceManager manager = context.getServiceManager();
-    manager.getServiceSpec().setState(
-        ServiceState.EXPRESS_UPGRADING);
+    manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING);
     initUpgrade(context, "v2", true, true, true);
 
     List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
-    // wait till instances of first component are in upgrade
-    String comp1 = comps.get(0);
-    upgradeInstancesOf(context, comp1);
+    // wait till instances of first component are upgraded and ready
+    String compA = comps.get(0);
+    makeInstancesReadyAfterUpgrade(context, compA);
 
-    // wait till instances of second component are in upgrade
-    String comp2 = comps.get(1);
-    upgradeInstancesOf(context, comp2);
+    // wait till instances of second component are upgraded and ready
+    String compB = comps.get(1);
+    makeInstancesReadyAfterUpgrade(context, compB);
 
     GenericTestUtils.waitFor(() ->
             context.service.getState().equals(ServiceState.STABLE),
@@ -196,6 +196,57 @@ public class TestServiceManager {
     validateUpgradeFinalization(manager.getName(), "v2");
   }
 
+  @Test(timeout = TIMEOUT)
+  public void testCancelUpgrade() throws Exception {
+    ServiceContext context = createServiceContext("testCancelUpgrade");
+    writeInitialDef(context.service);
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
+    Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+        manager.getServiceSpec().getState());
+
+    List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
+    // wait till instances of first component are upgraded and ready
+    String compA = comps.get(0);
+    // upgrade the instances
+    upgradeInstances(context, compA);
+    makeInstancesReadyAfterUpgrade(context, compA);
+
+    // cancel upgrade
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
+    makeInstancesReadyAfterUpgrade(context, compA);
+
+    GenericTestUtils.waitFor(()->
+            context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+    Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
+        manager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(manager.getName(), "v1");
+  }
+
+  @Test(timeout = TIMEOUT)
+  public void testCancelUpgradeAfterInitiate() throws Exception {
+    ServiceContext context = createServiceContext("testCancelUpgrade");
+    writeInitialDef(context.service);
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
+    Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+        manager.getServiceSpec().getState());
+
+    // cancel upgrade
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
+    GenericTestUtils.waitFor(()->
+            context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+    Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
+        manager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(manager.getName(), "v1");
+  }
+
   private void validateUpgradeFinalization(String serviceName,
       String expectedVersion) throws IOException {
     Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@@ -225,21 +276,23 @@ public class TestServiceManager {
     }
     writeUpgradedDef(upgradedDef);
     serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
-    ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
-    upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
-        .setAutoFinalize(autoFinalize);
-
-    GenericTestUtils.waitFor(()-> {
-      ServiceState serviceState = context.service.getState();
-      if (serviceState.equals(ServiceState.UPGRADING) ||
-          serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
-          serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
-        return true;
+    GenericTestUtils.waitFor(() -> {
+      for (Component comp : context.scheduler.getAllComponents().values()) {
+        if (!comp.getComponentSpec().getState().equals(
+            ComponentState.NEEDS_UPGRADE)) {
+          return false;
+        }
       }
-      return false;
+      return true;
     }, CHECK_EVERY_MILLIS, TIMEOUT);
   }
 
+  private void upgradeAndReadyAllInstances(ServiceContext context) throws
+      TimeoutException, InterruptedException {
+    upgradeAllInstances(context);
+    makeAllInstancesReady(context);
+  }
+
   private void upgradeAllInstances(ServiceContext context) throws
       TimeoutException, InterruptedException {
     // upgrade the instances
@@ -248,8 +301,10 @@ public class TestServiceManager {
           ComponentInstanceEventType.UPGRADE);
       context.scheduler.getDispatcher().getEventHandler().handle(event);
     }));
+  }
 
-    // become ready
+  private void makeAllInstancesReady(ServiceContext context)
+      throws TimeoutException, InterruptedException {
     context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
       ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
           ComponentInstanceEventType.BECOME_READY);
@@ -267,7 +322,19 @@ public class TestServiceManager {
     }, CHECK_EVERY_MILLIS, TIMEOUT);
   }
 
-  private void upgradeInstancesOf(ServiceContext context, String compName)
+  private void upgradeInstances(ServiceContext context, String compName) {
+    Collection<ComponentInstance> compInstances = context.scheduler
+        .getAllComponents().get(compName).getAllComponentInstances();
+    compInstances.forEach(instance -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.UPGRADE);
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
+    });
+  }
+
+  private void makeInstancesReadyAfterUpgrade(ServiceContext context,
+      String compName)
       throws TimeoutException, InterruptedException {
     Collection<ComponentInstance> compInstances = context.scheduler
         .getAllComponents().get(compName).getAllComponentInstances();
@@ -289,6 +356,15 @@ public class TestServiceManager {
 
       context.scheduler.getDispatcher().getEventHandler().handle(event);
     });
+
+    GenericTestUtils.waitFor(() -> {
+      for (ComponentInstance instance : compInstances) {
+        if (!instance.getContainerState().equals(ContainerState.READY)) {
+          return false;
+        }
+      }
+      return true;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
   }
 
   private ServiceContext createServiceContext(String name)
@@ -324,6 +400,14 @@ public class TestServiceManager {
     return artifact;
   }
 
+  private void writeInitialDef(Service service)
+      throws IOException, SliderException {
+    Path servicePath = rule.getFs().buildClusterDirPath(
+        service.getName());
+    ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath,
+        service);
+  }
+
   private void writeUpgradedDef(Service upgradedDef)
       throws IOException, SliderException {
     Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
@@ -332,6 +416,6 @@ public class TestServiceManager {
         upgradedDef);
   }
 
-  private static final int TIMEOUT = 200000;
+  private static final int TIMEOUT = 10000;
   private static final int CHECK_EVERY_MILLIS = 100;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 216d88f..3e23a10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
@@ -450,6 +451,49 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     client.actionDestroy(service.getName());
   }
 
+  @Test(timeout = 200000)
+  public void testCancelUpgrade() throws Exception {
+    setupInternal(NUM_NMS);
+    getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
+    ServiceClient client = createClient(getConf());
+
+    Service service = createExampleApplication();
+    Component component = service.getComponents().iterator().next();
+    component.getConfiguration().getEnv().put("key1", "val0");
+
+    client.actionCreate(service);
+    waitForServiceToBeStable(client, service);
+
+    // upgrade the service
+    service.setState(ServiceState.UPGRADING);
+    service.setVersion("v2");
+    component.getConfiguration().getEnv().put("key1", "val1");
+    client.initiateUpgrade(service);
+
+    // wait for service to be in upgrade state
+    waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
+
+    // upgrade 1 container
+    Service liveService = client.getStatus(service.getName());
+    Container container = liveService.getComponent(component.getName())
+        .getContainers().iterator().next();
+    client.actionUpgrade(service, Lists.newArrayList(container));
+
+    Thread.sleep(500);
+    // cancel the upgrade
+    client.actionCancelUpgrade(service.getName());
+    waitForServiceToBeStable(client, service);
+    Service active = client.getStatus(service.getName());
+    Assert.assertEquals("component not stable", ComponentState.STABLE,
+        active.getComponent(component.getName()).getState());
+    Assert.assertEquals("comp does not have new env", "val0",
+        active.getComponent(component.getName()).getConfiguration()
+            .getEnv("key1"));
+    LOG.info("Stop/destroy service {}", service);
+    client.actionStop(service.getName(), true);
+    client.actionDestroy(service.getName());
+  }
+
   // Test to verify ANTI_AFFINITY placement policy
   // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
   // 2. Create an example service with 3 containers

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
index 0e047c2..41be8c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
@@ -221,6 +221,17 @@ public class TestServiceCLI {
     Assert.assertEquals(result, 0);
   }
 
+  @Test
+  public void testCancelUpgrade() throws Exception {
+    conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+        DummyServiceClient.class.getName());
+    cli.setConf(conf);
+    String[] args = {"app", "-upgrade", "app-1",
+        "-cancel", "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
   @Test (timeout = 180000)
   public void testEnableFastLaunch() throws Exception {
     fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
@@ -332,5 +343,11 @@ public class TestServiceCLI {
         throws IOException, YarnException {
       return "";
     }
+
+    @Override
+    public int actionCancelUpgrade(String appName) throws IOException,
+        YarnException {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
index e1a4d9d..f11d871 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
@@ -19,8 +19,8 @@
 package org.apache.hadoop.yarn.service.component;
 
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.MockRunningServiceContext;
 import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.ServiceTestUtils;
 import org.apache.hadoop.yarn.service.TestServiceManager;
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
-import org.apache.hadoop.yarn.service.MockRunningServiceContext;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -38,9 +37,8 @@ import org.junit.Test;
 
 import java.util.Iterator;
 
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link Component}.
@@ -78,14 +76,14 @@ public class TestComponent {
             "val1")).setUpgradeVersion("v2"));
 
     // one instance finished upgrading
-    comp.decContainersThatNeedUpgrade();
+    comp.getUpgradeStatus().decContainersThatNeedUpgrade();
     comp.handle(new ComponentEvent(comp.getName(),
         ComponentEventType.CHECK_STABLE));
     Assert.assertEquals("component not in need upgrade state",
         ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
 
     // second instance finished upgrading
-    comp.decContainersThatNeedUpgrade();
+    comp.getUpgradeStatus().decContainersThatNeedUpgrade();
     comp.handle(new ComponentEvent(comp.getName(),
         ComponentEventType.CHECK_STABLE));
 
@@ -97,7 +95,7 @@ public class TestComponent {
 
   @Test
   public void testContainerCompletedWhenUpgrading() throws Exception {
-    String serviceName = "testContainerComplete";
+    String serviceName = "testContainerCompletedWhenUpgrading";
     MockRunningServiceContext context = createTestContext(rule, serviceName);
     Component comp = context.scheduler.getAllComponents().entrySet().iterator()
         .next().getValue();
@@ -105,48 +103,233 @@ public class TestComponent {
     comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
         .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
             "val1")).setUpgradeVersion("v2"));
-    comp.getAllComponentInstances().forEach(instance -> {
-      instance.handle(new ComponentInstanceEvent(
-          instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
-    });
-    Iterator<ComponentInstance> instanceIter = comp.
-        getAllComponentInstances().iterator();
+    comp.getAllComponentInstances().forEach(instance ->
+        instance.handle(new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)));
 
     // reinitialization of a container failed
-    ContainerStatus status = mock(ContainerStatus.class);
-    when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED);
-    ComponentInstance instance = instanceIter.next();
+    for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
+          ComponentEventType.CONTAINER_COMPLETED)
+          .setInstance(instance)
+          .setContainerId(instance.getContainer().getId());
+      comp.handle(stopEvent);
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), STOP));
+    }
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in needs upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+  }
+
+  @Test
+  public void testCancelUpgrade() throws Exception {
+    ServiceContext context = createTestContext(rule, "testCancelUpgrade");
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+
+    ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
+        ComponentEventType.CANCEL_UPGRADE);
+    comp.handle(upgradeEvent);
+    Assert.assertEquals("component not in need upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.service.component.ComponentState
+            .CANCEL_UPGRADING, comp.getState());
+  }
+
+  @Test
+  public void testContainerCompletedCancelUpgrade() throws Exception {
+    String serviceName = "testContainerCompletedCancelUpgrade";
+    MockRunningServiceContext context = createTestContext(rule, serviceName);
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+
+    // upgrade completes
+    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+            "val1")).setUpgradeVersion("v2"));
+    comp.getAllComponentInstances().forEach(instance ->
+        instance.handle(new ComponentInstanceEvent(
+            instance.getContainer().getId(),
+            ComponentInstanceEventType.UPGRADE)));
+
+    // reinitialization of a container done
+    for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), BECOME_READY));
+    }
+
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CANCEL_UPGRADE)
+        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+            "val0")).setUpgradeVersion("v1"));
+    comp.getAllComponentInstances().forEach(instance ->
+        instance.handle(new ComponentInstanceEvent(
+            instance.getContainer().getId(),
+            ComponentInstanceEventType.CANCEL_UPGRADE)));
+
+    Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
+        .iterator();
+
+    // cancel upgrade failed of a container
+    ComponentInstance instance1 = iter.next();
     ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
         ComponentEventType.CONTAINER_COMPLETED)
-        .setInstance(instance).setContainerId(instance.getContainer().getId())
-        .setStatus(status);
+        .setInstance(instance1)
+        .setContainerId(instance1.getContainer().getId());
     comp.handle(stopEvent);
-    instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
-        STOP).setStatus(status));
+    instance1.handle(new ComponentInstanceEvent(
+        instance1.getContainer().getId(), STOP));
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.service.component.ComponentState
+            .CANCEL_UPGRADING, comp.getState());
 
     comp.handle(new ComponentEvent(comp.getName(),
         ComponentEventType.CHECK_STABLE));
 
-    Assert.assertEquals("component not in flexing state",
-        ComponentState.FLEXING, comp.getComponentSpec().getState());
-
-    // new container get allocated
-    context.assignNewContainer(context.attemptId, 10, comp);
+    Assert.assertEquals("component not in needs upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.service.component.ComponentState
+            .CANCEL_UPGRADING, comp.getState());
 
     // second instance finished upgrading
-    ComponentInstance instance2 = instanceIter.next();
+    ComponentInstance instance2 = iter.next();
     instance2.handle(new ComponentInstanceEvent(
         instance2.getContainer().getId(),
         ComponentInstanceEventType.BECOME_READY));
+
     comp.handle(new ComponentEvent(comp.getName(),
         ComponentEventType.CHECK_STABLE));
 
+    Assert.assertEquals("component not in flexing state",
+        ComponentState.FLEXING, comp.getComponentSpec().getState());
+    // new container get allocated
+    context.assignNewContainer(context.attemptId, 10, comp);
+
+    comp.handle(new ComponentEvent(comp.getName(),
+            ComponentEventType.CHECK_STABLE));
+
     Assert.assertEquals("component not in stable state",
         ComponentState.STABLE, comp.getComponentSpec().getState());
-    Assert.assertEquals("component did not upgrade successfully", "val1",
+    Assert.assertEquals("cancel upgrade failed", "val0",
+        comp.getComponentSpec().getConfiguration().getEnv("key1"));
+  }
+
+  @Test
+  public void testCancelUpgradeSuccessWhileUpgrading() throws Exception {
+    String serviceName = "testCancelUpgradeWhileUpgrading";
+    MockRunningServiceContext context = createTestContext(rule, serviceName);
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+    cancelUpgradeWhileUpgrading(context, comp);
+
+    // cancel upgrade successful for both instances
+    for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.BECOME_READY));
+    }
+
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in stable state",
+        ComponentState.STABLE, comp.getComponentSpec().getState());
+    Assert.assertEquals("cancel upgrade failed", "val0",
+        comp.getComponentSpec().getConfiguration().getEnv("key1"));
+  }
+
+  @Test
+  public void testCancelUpgradeFailureWhileUpgrading() throws Exception {
+    String serviceName = "testCancelUpgradeFailureWhileUpgrading";
+    MockRunningServiceContext context = createTestContext(rule, serviceName);
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+    cancelUpgradeWhileUpgrading(context, comp);
+
+    // cancel upgrade failed for both instances
+    for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.STOP));
+    }
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in flexing state",
+        ComponentState.FLEXING, comp.getComponentSpec().getState());
+
+    for (ComponentInstance instance : comp.getAllComponentInstances()) {
+      // new container get allocated
+      context.assignNewContainer(context.attemptId, 10, comp);
+    }
+
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in stable state",
+        ComponentState.STABLE, comp.getComponentSpec().getState());
+    Assert.assertEquals("cancel upgrade failed", "val0",
         comp.getComponentSpec().getConfiguration().getEnv("key1"));
   }
 
+  private void cancelUpgradeWhileUpgrading(
+      MockRunningServiceContext context, Component comp)
+      throws Exception {
+
+    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+        .setTargetSpec(createSpecWithEnv(context.service.getName(),
+            comp.getName(), "key1", "val1")).setUpgradeVersion("v0"));
+
+    Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
+        .iterator();
+
+    ComponentInstance instance1 = iter.next();
+
+    // instance1 is triggered to upgrade
+    instance1.handle(new ComponentInstanceEvent(
+        instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
+
+    // component upgrade is cancelled
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CANCEL_UPGRADE)
+        .setTargetSpec(createSpecWithEnv(context.service.getName(),
+            comp.getName(), "key1",
+            "val0")).setUpgradeVersion("v0"));
+
+    // all instances upgrade is cancelled.
+    comp.getAllComponentInstances().forEach(instance ->
+        instance.handle(new ComponentInstanceEvent(
+            instance.getContainer().getId(),
+            ComponentInstanceEventType.CANCEL_UPGRADE)));
+
+    // regular upgrade failed for instance 1
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1)
+        .setContainerId(instance1.getContainer().getId()));
+    instance1.handle(new ComponentInstanceEvent(
+        instance1.getContainer().getId(), STOP));
+
+    // component should be in cancel upgrade
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.service.component.ComponentState
+            .CANCEL_UPGRADING, comp.getState());
+
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in needs upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+    Assert.assertEquals(
+        org.apache.hadoop.yarn.service.component.ComponentState
+            .CANCEL_UPGRADING, comp.getState());
+  }
+
   @Test
   public void testComponentStateReachesStableStateWithTerminatingComponents()
       throws
@@ -249,8 +432,6 @@ public class TestComponent {
         serviceState);
   }
 
-
-
   private static org.apache.hadoop.yarn.service.api.records.Component
       createSpecWithEnv(String serviceName, String compName, String key,
       String val) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
index e039981..c5a9631 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.component.instance;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -86,7 +87,7 @@ public class TestComponentInstance {
   @Test
   public void testContainerReadyAfterUpgrade() throws Exception {
     ServiceContext context = TestComponent.createTestContext(rule,
-        "testContainerStarted");
+        "testContainerReadyAfterUpgrade");
     Component component = context.scheduler.getAllComponents().entrySet()
         .iterator().next().getValue();
     upgradeComponent(component);
@@ -105,12 +106,186 @@ public class TestComponentInstance {
             .getId().toString()).getState());
   }
 
+
+  @Test
+  public void testContainerUpgradeFailed() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testContainerUpgradeFailed");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    upgradeComponent(component);
+
+    ComponentInstance instance = component.getAllComponentInstances().iterator()
+        .next();
+
+    ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+    instance.handle(upgradeEvent);
+
+    ContainerStatus containerStatus = mock(ContainerStatus.class);
+    when(containerStatus.getExitStatus()).thenReturn(
+        ContainerExitStatus.ABORTED);
+    ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.STOP)
+        .setStatus(containerStatus);
+    // this is the call back from NM for the upgrade
+    instance.handle(stopEvent);
+    Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
+  }
+
+  @Test
+  public void testCancelNothingToUpgrade() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testCancelUpgradeWhenContainerReady");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    cancelCompUpgrade(component);
+
+    ComponentInstance instance = component.getAllComponentInstances().iterator()
+        .next();
+
+    ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(),
+        ComponentInstanceEventType.CANCEL_UPGRADE);
+    instance.handle(cancelEvent);
+
+    Assert.assertEquals("instance not ready", ContainerState.READY,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
+  }
+
+  @Test
+  public void testCancelUpgradeFailed() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testCancelUpgradeFailed");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    cancelCompUpgrade(component);
+
+    ComponentInstance instance = component.getAllComponentInstances().iterator()
+        .next();
+
+    ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(),
+        ComponentInstanceEventType.CANCEL_UPGRADE);
+    instance.handle(cancelEvent);
+
+    instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+        ComponentInstanceEventType.STOP));
+    Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
+        instance.getState());
+  }
+
+  @Test
+  public void testCancelAfterCompProcessedCancel() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testCancelAfterCompProcessedCancel");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    upgradeComponent(component);
+    cancelCompUpgrade(component);
+
+    ComponentInstance instance = component.getAllComponentInstances().iterator()
+        .next();
+    ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+    instance.handle(upgradeEvent);
+
+    Assert.assertEquals("instance should start upgrading",
+        ContainerState.NEEDS_UPGRADE,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
+  }
+
+  @Test
+  public void testCancelWhileUpgradeWithSuccess() throws Exception {
+    validateCancelWhileUpgrading(true, true);
+  }
+
+  @Test
+  public void testCancelWhileUpgradeWithFailure() throws Exception {
+    validateCancelWhileUpgrading(false, true);
+  }
+
+  @Test
+  public void testCancelFailedWhileUpgradeWithSuccess() throws Exception {
+    validateCancelWhileUpgrading(true, false);
+  }
+
+  @Test
+  public void testCancelFailedWhileUpgradeWithFailure() throws Exception {
+    validateCancelWhileUpgrading(false, false);
+  }
+
+  private void validateCancelWhileUpgrading(boolean upgradeSuccessful,
+      boolean cancelUpgradeSuccessful)
+      throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testCancelWhileUpgrading");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    upgradeComponent(component);
+
+    ComponentInstance instance = component.getAllComponentInstances().iterator()
+        .next();
+    ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+    instance.handle(upgradeEvent);
+
+    Assert.assertEquals("instance should be upgrading",
+        ContainerState.UPGRADING,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
+
+    cancelCompUpgrade(component);
+    ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(),
+        ComponentInstanceEventType.CANCEL_UPGRADE);
+    instance.handle(cancelEvent);
+
+    // either upgrade failed or successful
+    ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(),
+        upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
+            ComponentInstanceEventType.STOP);
+
+    instance.handle(readyOrStopEvent);
+    Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
+
+    // response for cancel received
+    ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent(
+        instance.getContainer().getId(),
+        cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
+            ComponentInstanceEventType.STOP);
+
+    instance.handle(readyOrStopCancel);
+    if (cancelUpgradeSuccessful) {
+      Assert.assertEquals("instance not ready", ContainerState.READY,
+          component.getComponentSpec().getContainer(instance.getContainer()
+              .getId().toString()).getState());
+    } else {
+      Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
+          instance.getState());
+    }
+  }
+
   private void upgradeComponent(Component component) {
     component.handle(new ComponentEvent(component.getName(),
         ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec())
         .setUpgradeVersion("v2"));
   }
 
+  private void cancelCompUpgrade(Component component) {
+    component.handle(new ComponentEvent(component.getName(),
+        ComponentEventType.CANCEL_UPGRADE)
+        .setTargetSpec(component.getComponentSpec())
+        .setUpgradeVersion("v1"));
+  }
+
   private Component createComponent(ServiceScheduler scheduler,
       org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
           restartPolicy, int nSucceededInstances, int nFailedInstances,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index a0e4e02..b0e12bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -102,6 +102,7 @@ public class ApplicationCLI extends YarnCLI {
   public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
   public static final String UPGRADE_CMD = "upgrade";
   public static final String UPGRADE_EXPRESS = "express";
+  public static final String UPGRADE_CANCEL = "cancel";
   public static final String UPGRADE_INITIATE = "initiate";
   public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
   public static final String UPGRADE_FINALIZE = "finalize";
@@ -265,6 +266,8 @@ public class ApplicationCLI extends YarnCLI {
       opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
           "-initiate options to initiate the upgrade of the application with " +
           "the ability to finalize the upgrade automatically.");
+      opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " +
+          "cancel current upgrade.");
       opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
       opts.getOption(LAUNCH_CMD).setArgs(2);
       opts.getOption(START_CMD).setArgName("Application Name");
@@ -646,7 +649,7 @@ public class ApplicationCLI extends YarnCLI {
     } else if (cliParser.hasOption(UPGRADE_CMD)) {
       if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS,
           UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE,
-          COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
+          UPGRADE_CANCEL, COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
         printUsage(title, opts);
         return exitCode;
       }
@@ -697,6 +700,13 @@ public class ApplicationCLI extends YarnCLI {
           return exitCode;
         }
         return client.actionStart(appName);
+      } else if (cliParser.hasOption(UPGRADE_CANCEL)) {
+        if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
+            UPGRADE_CANCEL, APP_TYPE_CMD)) {
+          printUsage(title, opts);
+          return exitCode;
+        }
+        return client.actionCancelUpgrade(appName);
       }
     } else {
       syserr.println("Invalid Command Usage : ");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index a600895..f795db5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -2143,6 +2143,8 @@ public class TestYarnCLI {
     pw.println("                                          the upgrade of the application");
     pw.println("                                          with the ability to finalize the");
     pw.println("                                          upgrade automatically.");
+    pw.println(" -cancel                                  Works with -upgrade option to");
+    pw.println("                                          cancel current upgrade.");
     pw.println(" -changeQueue <Queue Name>                Moves application to a new");
     pw.println("                                          queue. ApplicationId can be");
     pw.println("                                          passed using 'appId' option.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
index 232666d..df11ffd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
@@ -300,4 +300,17 @@ public abstract class AppAdminClient extends CompositeService {
   @Unstable
   public abstract int actionUpgradeExpress(String appName, File fileName)
       throws IOException, YarnException;
+
+  /**
+   * Cancels the upgrade of the service.
+   *
+   * @param appName the name of the application
+   * @return exit code
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  @Unstable
+  public abstract int actionCancelUpgrade(String appName) throws IOException,
+      YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 27a7c80..01d70af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -1836,6 +1836,7 @@ public class ContainerManagerImpl extends CompositeService implements
   public void reInitializeContainer(ContainerId containerId,
       ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
       throws YarnException {
+    LOG.debug("{} requested reinit", containerId);
     Container container = preReInitializeOrLocalizeCheck(containerId,
         ReInitOp.RE_INIT);
     ResourceSet resourceSet = new ResourceSet();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-8665. Added Yarn service cancel upgrade option. Contributed by Chandni Singh

Posted by ey...@apache.org.
YARN-8665.  Added Yarn service cancel upgrade option.
            Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/913f87da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/913f87da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/913f87da

Branch: refs/heads/trunk
Commit: 913f87dada27776c539dfb352400ecf8d40e7943
Parents: e0ff8e2
Author: Eric Yang <ey...@apache.org>
Authored: Wed Sep 26 14:51:35 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Wed Sep 26 14:51:35 2018 -0400

----------------------------------------------------------------------
 .../yarn/service/client/ApiServiceClient.java   |  20 ++
 .../hadoop/yarn/service/webapp/ApiServer.java   |  34 +-
 .../hadoop/yarn/service/ClientAMProtocol.java   |   5 +
 .../hadoop/yarn/service/ClientAMService.java    |  12 +
 .../hadoop/yarn/service/ServiceEvent.java       |  14 +-
 .../hadoop/yarn/service/ServiceEventType.java   |   3 +-
 .../hadoop/yarn/service/ServiceManager.java     | 331 +++++++++++++------
 .../service/api/records/ContainerState.java     |   2 +-
 .../yarn/service/api/records/ServiceState.java  |   7 +-
 .../yarn/service/client/ServiceClient.java      |  21 ++
 .../yarn/service/component/Component.java       | 275 ++++++++++-----
 .../yarn/service/component/ComponentEvent.java  |  10 -
 .../service/component/ComponentEventType.java   |   1 +
 .../yarn/service/component/ComponentState.java  |   3 +-
 .../component/instance/ComponentInstance.java   | 269 ++++++++++++---
 .../instance/ComponentInstanceEventType.java    |   3 +-
 .../instance/ComponentInstanceState.java        |   3 +-
 .../containerlaunch/ContainerLaunchService.java |   4 +-
 .../pb/client/ClientAMProtocolPBClientImpl.java |  13 +
 .../service/ClientAMProtocolPBServiceImpl.java  |  13 +
 .../yarn/service/provider/ProviderUtils.java    |   9 +-
 .../yarn/service/utils/ServiceApiUtil.java      |  14 +-
 .../yarn/service/utils/SliderFileSystem.java    |  49 +++
 .../src/main/proto/ClientAMProtocol.proto       |   8 +
 .../yarn/service/MockRunningServiceContext.java |  20 +-
 .../hadoop/yarn/service/ServiceTestUtils.java   |   2 +-
 .../hadoop/yarn/service/TestServiceManager.java | 136 ++++++--
 .../yarn/service/TestYarnNativeServices.java    |  44 +++
 .../yarn/service/client/TestServiceCLI.java     |  17 +
 .../yarn/service/component/TestComponent.java   | 239 +++++++++++--
 .../instance/TestComponentInstance.java         | 177 +++++++++-
 .../src/test/resources/log4j.properties         |  19 ++
 .../hadoop/yarn/client/cli/ApplicationCLI.java  |  12 +-
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |   2 +
 .../hadoop/yarn/client/api/AppAdminClient.java  |  13 +
 .../containermanager/ContainerManagerImpl.java  |   1 +
 36 files changed, 1470 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index ca6cc50..b7a1541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient {
     }
     return null;
   }
+
+  @Override
+  public int actionCancelUpgrade(
+      String appName) throws IOException, YarnException {
+    int result;
+    try {
+      Service service = new Service();
+      service.setName(appName);
+      service.setState(ServiceState.CANCEL_UPGRADING);
+      String buffer = jsonSerDeser.toJson(service);
+      LOG.info("Cancel upgrade in progress. Please wait..");
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to cancel upgrade: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index cd6f0d7..c4e3317 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -58,6 +58,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
+import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING;
 import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
 
@@ -445,6 +446,12 @@ public class ApiServer {
         return upgradeService(updateServiceData, ugi);
       }
 
+      // If CANCEL_UPGRADING is requested
+      if (updateServiceData.getState() != null &&
+          updateServiceData.getState() == CANCEL_UPGRADING) {
+        return cancelUpgradeService(appName, ugi);
+      }
+
       // If new lifetime value specified then update it
       if (updateServiceData.getLifetime() != null
           && updateServiceData.getLifetime() > 0) {
@@ -460,8 +467,7 @@ public class ApiServer {
       LOG.error(message, e);
       return formatResponse(Status.NOT_FOUND, e.getMessage());
     } catch (YarnException e) {
-      String message = "Service is not found in hdfs: " + appName;
-      LOG.error(message, e);
+      LOG.error(e.getMessage(), e);
       return formatResponse(Status.NOT_FOUND, e.getMessage());
     } catch (Exception e) {
       String message = "Error while performing operation for app: " + appName;
@@ -707,6 +713,27 @@ public class ApiServer {
     return formatResponse(Status.ACCEPTED, status);
   }
 
+  private Response cancelUpgradeService(String serviceName,
+      final UserGroupInformation ugi) throws IOException, InterruptedException {
+    int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      int exitCode = sc.actionCancelUpgrade(serviceName);
+      sc.close();
+      return exitCode;
+    });
+    if (result == EXIT_SUCCESS) {
+      ServiceStatus status = new ServiceStatus();
+      LOG.info("Service {} cancelling upgrade", serviceName);
+      status.setDiagnostics("Service " + serviceName +
+          " cancelling upgrade.");
+      status.setState(ServiceState.ACCEPTED);
+      return formatResponse(Status.ACCEPTED, status);
+    }
+    return Response.status(Status.BAD_REQUEST).build();
+  }
+
   private Response processComponentsUpgrade(UserGroupInformation ugi,
       String serviceName, Set<String> compNames) throws YarnException,
       IOException, InterruptedException {
@@ -734,7 +761,8 @@ public class ApiServer {
       Service service, List<Container> containers) throws YarnException,
       IOException, InterruptedException {
 
-    if (service.getState() != ServiceState.UPGRADING) {
+    if (!service.getState().equals(ServiceState.UPGRADING) &&
+        !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
       throw new YarnException(
           String.format("The upgrade of service %s has not been initiated.",
               service.getName()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
index 652a314..39e7dfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -60,4 +62,7 @@ public interface ClientAMProtocol {
 
   GetCompInstancesResponseProto getCompInstances(
       GetCompInstancesRequestProto request) throws IOException, YarnException;
+
+  CancelUpgradeResponseProto cancelUpgrade(
+      CancelUpgradeRequestProto request) throws IOException, YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 2ef8f7e..47e9829 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
@@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService
         ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
             new Container[containers.size()]))).build();
   }
+
+  @Override
+  public CancelUpgradeResponseProto cancelUpgrade(
+      CancelUpgradeRequestProto request) throws IOException, YarnException {
+    LOG.info("Cancel service upgrade by {}",
+        UserGroupInformation.getCurrentUser());
+    ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE);
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+    return CancelUpgradeResponseProto.newBuilder().build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 3a55472..cf4455d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.service.api.records.Component;
 
-import java.util.Queue;
+import java.util.List;
 
 /**
  * Events are handled by {@link ServiceManager} to manage the service
@@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
   private String version;
   private boolean autoFinalize;
   private boolean expressUpgrade;
-  private Queue<Component> compsToUpgradeInOrder;
+  // For express upgrade they should be in order.
+  private List<Component> compsToUpgrade;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     return this;
   }
 
-  public Queue<Component> getCompsToUpgradeInOrder() {
-    return compsToUpgradeInOrder;
+  public List<Component> getCompsToUpgrade() {
+    return compsToUpgrade;
   }
 
-  public ServiceEvent setCompsToUpgradeInOrder(
-      Queue<Component> compsToUpgradeInOrder) {
-    this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+  public ServiceEvent setCompsToUpgrade(List<Component> compsToUpgrade) {
+    this.compsToUpgrade = compsToUpgrade;
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
index 4fc420b..03afdd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
@@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service;
 public enum ServiceEventType {
   START,
   UPGRADE,
-  CHECK_STABLE
+  CHECK_STABLE,
+  CANCEL_UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index 04454b1..4851325 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
-import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
-import org.apache.hadoop.yarn.state.MultipleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
-import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   private final SliderFileSystem fs;
 
   private String upgradeVersion;
-  private Queue<org.apache.hadoop.yarn.service.api.records
-        .Component> compsToUpgradeInOrder;
+  private List<org.apache.hadoop.yarn.service.api.records
+      .Component> componentsToUpgrade;
+  private List<String> compsAffectedByUpgrade = new ArrayList<>();
+  private String cancelledVersion;
 
   private static final StateMachineFactory<ServiceManager, State,
       ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
 
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.START,
-              new CheckStableTransition())
+              new StartFromUpgradeTransition())
 
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.CHECK_STABLE,
               new CheckStableTransition())
+
+          .addTransition(State.UPGRADING, State.UPGRADING,
+              ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition())
           .installTopology();
 
   public ServiceManager(ServiceContext context) {
@@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
       serviceManager.upgradeVersion = event.getVersion();
+      serviceManager.componentsToUpgrade = event.getCompsToUpgrade();
+      event.getCompsToUpgrade().forEach(comp ->
+          serviceManager.compsAffectedByUpgrade.add(comp.getName()));
       try {
         if (event.isExpressUpgrade()) {
-          serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
-          serviceManager.compsToUpgradeInOrder = event
-              .getCompsToUpgradeInOrder();
-          serviceManager.upgradeNextCompIfAny();
+          serviceManager.dispatchNeedUpgradeEvents(false);
+          serviceManager.upgradeNextCompIfAny(false);
+        } else {
+          serviceManager.dispatchNeedUpgradeEvents(false);
+        }
+
+        if (event.isExpressUpgrade()) {
+          serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING);
         } else if (event.isAutoFinalize()) {
-          serviceManager.serviceSpec.setState(ServiceState
-              .UPGRADING_AUTO_FINALIZE);
+          serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE);
         } else {
-          serviceManager.serviceSpec.setState(
-              ServiceState.UPGRADING);
+          serviceManager.setServiceState(ServiceState.UPGRADING);
         }
+
         return State.UPGRADING;
       } catch (Throwable e) {
         LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       if (currState.equals(ServiceState.STABLE)) {
         return State.STABLE;
       }
-      if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
-        org.apache.hadoop.yarn.service.api.records.Component component =
-            serviceManager.compsToUpgradeInOrder.peek();
-        if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
-            !component.getState().equals(ComponentState.UPGRADING)) {
-          serviceManager.compsToUpgradeInOrder.remove();
+      if (currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+          currState.equals(ServiceState.CANCEL_UPGRADING)) {
+
+        if (!serviceManager.componentsToUpgrade.isEmpty()) {
+          org.apache.hadoop.yarn.service.api.records.Component compSpec =
+              serviceManager.componentsToUpgrade.get(0);
+          Component component = serviceManager.scheduler.getAllComponents()
+              .get(compSpec.getName());
+
+          if (!component.isUpgrading()) {
+            serviceManager.componentsToUpgrade.remove(0);
+            serviceManager.upgradeNextCompIfAny(
+                currState.equals(ServiceState.CANCEL_UPGRADING));
+          }
         }
-        serviceManager.upgradeNextCompIfAny();
       }
+
       if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
-          event.getType().equals(ServiceEventType.START) ||
-          (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
-              serviceManager.compsToUpgradeInOrder.isEmpty())) {
+          ((currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+              currState.equals(ServiceState.CANCEL_UPGRADING)) &&
+              serviceManager.componentsToUpgrade.isEmpty())) {
+
         ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
         if (targetState.equals(ServiceState.STABLE)) {
-          if (serviceManager.finalizeUpgrade()) {
-            LOG.info("Service def state changed from {} -> {}", currState,
-                serviceManager.serviceSpec.getState());
+          if (serviceManager.finalizeUpgrade(
+              currState.equals(ServiceState.CANCEL_UPGRADING))) {
             return State.STABLE;
           }
         }
@@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
-  private void upgradeNextCompIfAny() {
-    if (!compsToUpgradeInOrder.isEmpty()) {
+  private static class StartFromUpgradeTransition implements
+      MultipleArcTransition<ServiceManager, ServiceEvent, State> {
+
+    @Override
+    public State transition(ServiceManager serviceManager, ServiceEvent event) {
+      ServiceState currState = serviceManager.serviceSpec.getState();
+
+      ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
+      if (targetState.equals(ServiceState.STABLE)) {
+        if (serviceManager.finalizeUpgrade(
+            currState.equals(ServiceState.CANCEL_UPGRADING))) {
+          return State.STABLE;
+        }
+      }
+      return State.UPGRADING;
+    }
+  }
+
+  private static class CancelUpgradeTransition implements
+      SingleArcTransition<ServiceManager, ServiceEvent> {
+
+    @Override
+    public void transition(ServiceManager serviceManager,
+        ServiceEvent event) {
+      if (!serviceManager.getState().equals(State.UPGRADING)) {
+        LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state",
+            serviceManager.getState());
+        return;
+      }
+      try {
+        Service targetSpec = ServiceApiUtil.loadService(
+            serviceManager.context.fs, serviceManager.getName());
+
+        Service sourceSpec = ServiceApiUtil.loadServiceUpgrade(
+            serviceManager.context.fs, serviceManager.getName(),
+            serviceManager.upgradeVersion);
+        serviceManager.cancelledVersion = serviceManager.upgradeVersion;
+        LOG.info("[SERVICE] cancel version {}",
+            serviceManager.cancelledVersion);
+        serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion();
+        serviceManager.componentsToUpgrade = serviceManager
+            .resolveCompsToUpgrade(sourceSpec, targetSpec);
+
+        serviceManager.compsAffectedByUpgrade.clear();
+        serviceManager.componentsToUpgrade.forEach(comp ->
+            serviceManager.compsAffectedByUpgrade.add(comp.getName()));
+
+        serviceManager.dispatchNeedUpgradeEvents(true);
+        serviceManager.upgradeNextCompIfAny(true);
+        serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING);
+      } catch (Throwable e) {
+        LOG.error("[SERVICE]: Cancellation of upgrade failed", e);
+      }
+    }
+  }
+
+  private void upgradeNextCompIfAny(boolean cancelUpgrade) {
+    if (!componentsToUpgrade.isEmpty()) {
       org.apache.hadoop.yarn.service.api.records.Component component =
-          compsToUpgradeInOrder.peek();
+          componentsToUpgrade.get(0);
+
+      serviceSpec.getComponent(component.getName()).getContainers().forEach(
+          container -> {
+            ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+                ContainerId.fromString(container.getId()),
+                !cancelUpgrade ? ComponentInstanceEventType.UPGRADE :
+                    ComponentInstanceEventType.CANCEL_UPGRADE);
+            LOG.info("Upgrade container {} {}", container.getId(),
+                cancelUpgrade);
+            dispatcher.getEventHandler().handle(upgradeEvent);
+          });
+    }
+  }
 
-      ComponentEvent needUpgradeEvent = new ComponentEvent(
-          component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
-          component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
-      context.scheduler.getDispatcher().getEventHandler().handle(
-          needUpgradeEvent);
+  private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) {
+    if (componentsToUpgrade != null) {
+      componentsToUpgrade.forEach(component -> {
+        ComponentEvent needUpgradeEvent = new ComponentEvent(
+            component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE :
+            ComponentEventType.CANCEL_UPGRADE)
+            .setTargetSpec(component)
+            .setUpgradeVersion(upgradeVersion);
+        LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade);
+        context.scheduler.getDispatcher().getEventHandler()
+            .handle(needUpgradeEvent);
+      });
     }
   }
 
   /**
    * @return whether finalization of upgrade was successful.
    */
-  private boolean finalizeUpgrade() {
-    try {
-      // save the application id and state to
-      Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
-          fs, getName(), upgradeVersion);
-      targetSpec.setId(serviceSpec.getId());
-      targetSpec.setState(ServiceState.STABLE);
-      Map<String, Component> allComps = scheduler.getAllComponents();
-      targetSpec.getComponents().forEach(compSpec -> {
-        Component comp = allComps.get(compSpec.getName());
-        compSpec.setState(comp.getComponentSpec().getState());
-      });
-      jsonSerDeser.save(fs.getFileSystem(),
-          ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
-      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
-    } catch (IOException e) {
-      LOG.error("Upgrade did not complete because unable to re-write the" +
-          " service definition", e);
-      return false;
+  private boolean finalizeUpgrade(boolean cancelUpgrade) {
+    if (!cancelUpgrade) {
+      try {
+        // save the application id and state to
+        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+            fs, getName(), upgradeVersion);
+        targetSpec.setId(serviceSpec.getId());
+        targetSpec.setState(ServiceState.STABLE);
+        Map<String, Component> allComps = scheduler.getAllComponents();
+        targetSpec.getComponents().forEach(compSpec -> {
+          Component comp = allComps.get(compSpec.getName());
+          compSpec.setState(comp.getComponentSpec().getState());
+        });
+        jsonSerDeser.save(fs.getFileSystem(),
+            ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
+      } catch (IOException e) {
+        LOG.error("Upgrade did not complete because unable to re-write the" +
+            " service definition", e);
+        return false;
+      }
     }
 
     try {
-      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
+      String upgradeVersionToDel = cancelUpgrade? cancelledVersion :
+          upgradeVersion;
+      LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel);
+      fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel);
+
+      for (String comp : compsAffectedByUpgrade) {
+        String compDirVersionToDel = cancelUpgrade? cancelledVersion :
+            serviceSpec.getVersion();
+        LOG.info("[SERVICE]: delete {} dir version {}",  comp,
+            compDirVersionToDel);
+        fs.deleteComponentDir(compDirVersionToDel, comp);
+      }
+
+      if (cancelUpgrade) {
+        fs.deleteComponentsVersionDirIfEmpty(cancelledVersion);
+      } else {
+        fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion());
+      }
+
     } catch (IOException e) {
       LOG.warn("Unable to delete upgrade definition for service {} " +
           "version {}", getName(), upgradeVersion);
     }
-    serviceSpec.setState(ServiceState.STABLE);
+    setServiceState(ServiceState.STABLE);
     serviceSpec.setVersion(upgradeVersion);
     upgradeVersion = null;
+    cancelledVersion = null;
+    compsAffectedByUpgrade.clear();
     return true;
   }
 
@@ -291,30 +406,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
         context.fs, context.service.getName(), upgradeVersion);
 
     List<org.apache.hadoop.yarn.service.api.records.Component>
-        compsNeedUpgradeList = componentsFinder.
-        findTargetComponentSpecs(context.service, targetSpec);
-
-    // remove all components from need upgrade list if there restart policy
-    // doesn't all upgrade.
-    if (compsNeedUpgradeList != null) {
-      compsNeedUpgradeList.removeIf(component -> {
-        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
-            restartPolicy = component.getRestartPolicy();
-
-        final ComponentRestartPolicy restartPolicyHandler =
-            Component.getRestartPolicyHandler(restartPolicy);
-        // Do not allow upgrades for components which have NEVER/ON_FAILURE
-        // restart policy
-        if (!restartPolicyHandler.allowUpgrades()) {
-          LOG.info("The component {} has a restart policy that doesnt " +
-                  "allow upgrades {} ", component.getName(),
-              component.getRestartPolicy().toString());
-          return true;
-        }
-
-        return false;
-      });
-    }
+        compsNeedUpgradeList = resolveCompsToUpgrade(context.service,
+        targetSpec);
 
     ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
         .setVersion(upgradeVersion)
@@ -334,7 +427,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       List<String> resolvedComps = ServiceApiUtil
           .resolveCompsDependency(targetSpec);
 
-      Queue<org.apache.hadoop.yarn.service.api.records.Component>
+      List<org.apache.hadoop.yarn.service.api.records.Component>
           orderedCompUpgrade = new LinkedList<>();
       resolvedComps.forEach(compName -> {
         org.apache.hadoop.yarn.service.api.records.Component component =
@@ -343,30 +436,68 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
           orderedCompUpgrade.add(component);
         }
       });
-      event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+      event.setCompsToUpgrade(orderedCompUpgrade);
+    } else {
+      event.setCompsToUpgrade(compsNeedUpgradeList);
     }
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        event);
 
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
-
-    if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
-      if (!expressUpgrade) {
-        compsNeedUpgradeList.forEach(component -> {
-          ComponentEvent needUpgradeEvent = new ComponentEvent(
-              component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
-              component).setUpgradeVersion(event.getVersion());
-          context.scheduler.getDispatcher().getEventHandler().handle(
-              needUpgradeEvent);
-
-        });
-      }
-    }  else if (autoFinalize) {
-      // nothing to upgrade if upgrade auto finalize is requested, trigger a
+    if (autoFinalize && (compsNeedUpgradeList == null ||
+        compsNeedUpgradeList.isEmpty())) {
+      // nothing to upgrade and auto finalize is requested, trigger a
       // state check.
       context.scheduler.getDispatcher().getEventHandler().handle(
           new ServiceEvent(ServiceEventType.CHECK_STABLE));
     }
   }
 
+  private List<org.apache.hadoop.yarn.service.api.records.Component>
+      resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) {
+
+    List<org.apache.hadoop.yarn.service.api.records.Component>
+        compsNeedUpgradeList = componentsFinder.
+        findTargetComponentSpecs(sourceSpec, targetSpec);
+
+    // remove all components from need upgrade list if there restart policy
+    // doesn't all upgrade.
+    if (compsNeedUpgradeList != null) {
+      compsNeedUpgradeList.removeIf(component -> {
+        org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
+            restartPolicy = component.getRestartPolicy();
+
+        final ComponentRestartPolicy restartPolicyHandler =
+            Component.getRestartPolicyHandler(restartPolicy);
+        // Do not allow upgrades for components which have NEVER/ON_FAILURE
+        // restart policy
+        if (!restartPolicyHandler.allowUpgrades()) {
+          LOG.info("The component {} has a restart policy that doesnt " +
+                  "allow upgrades {} ", component.getName(),
+              component.getRestartPolicy().toString());
+          return true;
+        }
+
+        return false;
+      });
+    }
+
+    return compsNeedUpgradeList;
+  }
+
+  /**
+   * Sets the state of the service in the service spec.
+   * @param state service state
+   */
+  private void setServiceState(
+      org.apache.hadoop.yarn.service.api.records.ServiceState state) {
+    org.apache.hadoop.yarn.service.api.records.ServiceState curState =
+        serviceSpec.getState();
+    if (!curState.equals(state)) {
+      serviceSpec.setState(state);
+      LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state);
+    }
+  }
+
   /**
    * Returns the name of the service.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
index cac527a..a6e9a2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
@@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum ContainerState {
   RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
-  FAILED;
+  FAILED, FAILED_UPGRADE;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 49c1985..3f2f4f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 @ApiModel(description = "The current state of an service.")
 public enum ServiceState {
   ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
-  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
+  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING;
+
+  public static boolean isUpgrading(ServiceState state) {
+    return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE)
+        || state.equals(EXPRESS_UPGRADING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index a27ed87..23db57e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -353,6 +354,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   }
 
   @Override
+  public int actionCancelUpgrade(String appName) throws IOException,
+      YarnException {
+    Service liveService = getStatus(appName);
+    if (liveService == null ||
+        !ServiceState.isUpgrading(liveService.getState())) {
+      throw new YarnException("Service " + appName + " is not upgrading, " +
+          "so nothing to cancel.");
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(
+        getAppId(appName));
+    if (StringUtils.isEmpty(appReport.getHost())) {
+      throw new YarnException(appName + " AM hostname is empty");
+    }
+    ClientAMProtocol proxy = createAMProxy(appName, appReport);
+    proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build());
+    return EXIT_SUCCESS;
+  }
+
+  @Override
   public int actionCleanUp(String appName, String userName) throws
       IOException, YarnException {
     if (cleanUpRegistry(appName, userName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index acf3404..526bde0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import static org.apache.hadoop.yarn.service.api.records.Component
@@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
@@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
 import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE;
 import static org.apache.hadoop.yarn.service.component.ComponentState.*;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
 import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
@@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> {
       new ConcurrentHashMap<>();
   private boolean healthThresholdMonitorEnabled = false;
 
-  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
-  private ComponentEvent upgradeEvent;
-  private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
+  private UpgradeStatus upgradeStatus = new UpgradeStatus();
+  private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus();
 
   private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
       stateMachine;
@@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> {
           // Flex while previous flex is still in progress
           .addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
               new FlexComponentTransition())
+          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
 
           // container failed while stable
           .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
@@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> {
           // For flex down, go to STABLE state
           .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
-          .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          //Upgrade while previous upgrade is still in progress
-          .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
-              CHECK_STABLE, new CheckStableTransition())
-          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
-              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(STABLE, UPGRADING, UPGRADE,
+              new NeedsUpgradeTransition())
+          .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
+              new NeedsUpgradeTransition())
           .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
               new CheckStableTransition())
-          .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
-              new ContainerCompletedTransition())
+
+          // Cancel upgrade while previous upgrade is still in progress
+          .addTransition(UPGRADING, CANCEL_UPGRADING,
+              CANCEL_UPGRADE, new NeedsUpgradeTransition())
+          .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
+              new CompletedAfterUpgradeTransition())
+
+          .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
+              STABLE), CHECK_STABLE, new CheckStableTransition())
+          .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
+              CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
+          .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
+              new ContainerAllocatedTransition())
+
           .installTopology();
 
   public Component(
@@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> {
                 + before + " to " + event.getDesired());
         component.requestContainers(delta);
         component.createNumCompInstances(delta);
-        component.componentSpec.setState(
+        component.setComponentState(
             org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
         component.getScheduler().getApp().setState(ServiceState.STARTED);
         return FLEXING;
@@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> {
     if (component.getNumRunningInstances() + component
         .getNumSucceededInstances() + component.getNumFailedInstances()
         < component.getComponentSpec().getNumberOfContainers()) {
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
     } else{
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
     }
@@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> {
       Component component) {
     // if desired == running
     if (component.componentMetrics.containersReady.value() == component
-        .getComponentSpec().getNumberOfContainers()
-        && component.numContainersThatNeedUpgrade.get() == 0) {
-      component.componentSpec.setState(
+        .getComponentSpec().getNumberOfContainers() &&
+        !component.doesNeedUpgrade()) {
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
+    } else if (component.doesNeedUpgrade()) {
+      component.setComponentState(org.apache.hadoop.yarn.service.api.records.
+          ComponentState.NEEDS_UPGRADE);
+      return component.getState();
     } else if (component.componentMetrics.containersReady.value() != component
         .getComponentSpec().getNumberOfContainers()) {
-      component.componentSpec.setState(
+      component.setComponentState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
-    } else {
-      //  component.numContainersThatNeedUpgrade.get() > 0
-      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
-          records.ComponentState.NEEDS_UPGRADE);
-      return UPGRADING;
     }
+    return component.getState();
   }
 
   // This method should be called whenever there is an increment or decrement
@@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> {
   //This should not matter for terminating components
   private static synchronized void checkAndUpdateComponentState(
       Component component, boolean isIncrement) {
-    org.apache.hadoop.yarn.service.api.records.ComponentState curState =
-        component.componentSpec.getState();
 
     if (component.getRestartPolicyHandler().isLongLived()) {
       if (isIncrement) {
         // check if all containers are in READY state
-        if (component.numContainersThatNeedUpgrade.get() == 0
-            && component.componentMetrics.containersReady.value()
-            == component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+        if (!component.upgradeStatus.areContainersUpgrading() &&
+            !component.cancelUpgradeStatus.areContainersUpgrading() &&
+            component.componentMetrics.containersReady.value() ==
+                component.componentMetrics.containersDesired.value()) {
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
-          if (curState != component.componentSpec.getState()) {
-            LOG.info("[COMPONENT {}] state changed from {} -> {}",
-                component.componentSpec.getName(), curState,
-                component.componentSpec.getState());
-          }
           // component state change will trigger re-check of service state
           component.context.getServiceManager().checkAndUpdateServiceState();
         }
@@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> {
         // still need to verify the count before changing the component state
         if (component.componentMetrics.containersReady.value()
             < component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState
                   .FLEXING);
         } else if (component.componentMetrics.containersReady.value()
             == component.componentMetrics.containersDesired.value()) {
-          component.componentSpec.setState(
+          component.setComponentState(
               org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
         }
-        if (curState != component.componentSpec.getState()) {
-          LOG.info("[COMPONENT {}] state changed from {} -> {}",
-              component.componentSpec.getName(), curState,
-              component.componentSpec.getState());
-        }
         // component state change will trigger re-check of service state
         component.context.getServiceManager().checkAndUpdateServiceState();
       }
@@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> {
       // component state change will trigger re-check of service state
       component.context.getServiceManager().checkAndUpdateServiceState();
     }
-    // when the service is stable then the state of component needs to
-    // transition to stable
+    // triggers the state machine in component to reach appropriate state
+    // once the state in spec is changed.
     component.dispatcher.getEventHandler().handle(
         new ComponentEvent(component.getName(),
             ComponentEventType.CHECK_STABLE));
@@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  private static class ComponentNeedsUpgradeTransition extends BaseTransition {
+  private static class CompletedAfterUpgradeTransition extends BaseTransition {
     @Override
     public void transition(Component component, ComponentEvent event) {
-      component.upgradeInProgress.set(true);
-      component.upgradeEvent = event;
-      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
-          records.ComponentState.NEEDS_UPGRADE);
-      component.numContainersThatNeedUpgrade.set(
+      Preconditions.checkNotNull(event.getContainerId());
+      component.updateMetrics(event.getStatus());
+      component.dispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(event.getContainerId(), STOP)
+              .setStatus(event.getStatus()));
+    }
+  }
+
+  private static class NeedsUpgradeTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      boolean isCancel = event.getType().equals(CANCEL_UPGRADE);
+      UpgradeStatus status = !isCancel ? component.upgradeStatus :
+          component.cancelUpgradeStatus;
+
+      status.inProgress.set(true);
+      status.targetSpec = event.getTargetSpec();
+      status.targetVersion = event.getUpgradeVersion();
+      LOG.info("[COMPONENT {}]: need upgrade to {}",
+          component.getName(), status.targetVersion);
+
+      status.containersNeedUpgrade.set(
           component.componentSpec.getNumberOfContainers());
-      component.componentSpec.getContainers().forEach(container -> {
-        container.setState(ContainerState.NEEDS_UPGRADE);
-        if (event.isExpressUpgrade()) {
-          ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
-              ContainerId.fromString(container.getId()),
-                  ComponentInstanceEventType.UPGRADE);
-          LOG.info("Upgrade container {}", container.getId());
-          component.dispatcher.getEventHandler().handle(upgradeEvent);
-        }
+
+      component.setComponentState(org.apache.hadoop.yarn.service.api.
+          records.ComponentState.NEEDS_UPGRADE);
+
+      component.getAllComponentInstances().forEach(instance -> {
+        instance.setContainerState(ContainerState.NEEDS_UPGRADE);
       });
+
+      if (event.getType().equals(CANCEL_UPGRADE)) {
+        component.upgradeStatus.reset();
+      }
     }
   }
 
@@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> {
     @Override
     public ComponentState transition(Component component,
         ComponentEvent componentEvent) {
-      org.apache.hadoop.yarn.service.api.records.ComponentState currState =
-          component.componentSpec.getState();
-      if (currState.equals(org.apache.hadoop.yarn.service.api.records
-          .ComponentState.STABLE)) {
-        return ComponentState.STABLE;
-      }
       // checkIfStable also updates the state in definition when STABLE
       ComponentState targetState = checkIfStable(component);
-      if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
-        component.componentSpec.overwrite(
-            component.upgradeEvent.getTargetSpec());
-        component.upgradeEvent = null;
+
+      if (targetState.equals(STABLE) &&
+          !(component.upgradeStatus.isCompleted() &&
+              component.cancelUpgradeStatus.isCompleted())) {
+        // Component stable after upgrade or cancel upgrade
+        UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ?
+            component.cancelUpgradeStatus : component.upgradeStatus;
+
+        component.componentSpec.overwrite(status.getTargetSpec());
+        status.reset();
+
         ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
             CHECK_STABLE);
         component.dispatcher.getEventHandler().handle(checkStable);
-        component.upgradeInProgress.set(false);
       }
       return targetState;
     }
@@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> {
         "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
         getName(), container.getId(), instance.getCompInstanceName(),
         container.getNodeId());
-    if (upgradeInProgress.get()) {
+    if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
+      UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
+          cancelUpgradeStatus : upgradeStatus;
+
       scheduler.getContainerLaunchService()
           .launchCompInstance(scheduler.getApp(), instance, container,
-              createLaunchContext(upgradeEvent.getTargetSpec(),
-                  upgradeEvent.getUpgradeVersion()));
+              createLaunchContext(status.getTargetSpec(),
+                  status.getTargetVersion()));
     } else {
       scheduler.getContainerLaunchService().launchCompInstance(
           scheduler.getApp(), instance, container,
@@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
+  private boolean doesNeedUpgrade() {
+    return cancelUpgradeStatus.areContainersUpgrading() ||
+        upgradeStatus.areContainersUpgrading() ||
+        upgradeStatus.failed.get();
+  }
+
   public boolean areDependenciesReady() {
     List<String> dependencies = componentSpec.getDependencies();
     if (ServiceUtils.isEmpty(dependencies)) {
@@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  public void decContainersThatNeedUpgrade() {
-    numContainersThatNeedUpgrade.decrementAndGet();
-  }
-
   public int getNumReadyInstances() {
     return componentMetrics.containersReady.value();
   }
@@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
-  public ComponentEvent getUpgradeEvent() {
+  /**
+   * Returns whether a component is upgrading or not.
+   */
+  public boolean isUpgrading() {
+    this.readLock.lock();
+
+    try {
+      return !(upgradeStatus.isCompleted() &&
+          cancelUpgradeStatus.isCompleted());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public UpgradeStatus getUpgradeStatus() {
+    this.readLock.lock();
+    try {
+      return upgradeStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public UpgradeStatus getCancelUpgradeStatus() {
     this.readLock.lock();
     try {
-      return upgradeEvent;
+      return cancelUpgradeStatus;
     } finally {
       this.readLock.unlock();
     }
@@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
+  /**
+   * Sets the state of the component in the component spec.
+   * @param state component state
+   */
+  private void setComponentState(
+      org.apache.hadoop.yarn.service.api.records.ComponentState state) {
+    org.apache.hadoop.yarn.service.api.records.ComponentState curState =
+        componentSpec.getState();
+    if (!curState.equals(state)) {
+      componentSpec.setState(state);
+      LOG.info("[COMPONENT {}] spec state changed from {} -> {}",
+          componentSpec.getName(), curState, state);
+    }
+  }
+
+  /**
+   * Status of upgrade.
+   */
+  public static class UpgradeStatus {
+    private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
+    private String targetVersion;
+    private AtomicBoolean inProgress = new AtomicBoolean(false);
+    private AtomicLong containersNeedUpgrade = new AtomicLong(0);
+    private AtomicBoolean failed = new AtomicBoolean(false);
+
+    public org.apache.hadoop.yarn.service.api.records.
+        Component getTargetSpec() {
+      return targetSpec;
+    }
+
+    public String getTargetVersion() {
+      return targetVersion;
+    }
+
+    /*
+     * @return whether the upgrade is completed or not
+     */
+    public boolean isCompleted() {
+      return !inProgress.get();
+    }
+
+    public void decContainersThatNeedUpgrade() {
+      if (inProgress.get()) {
+        containersNeedUpgrade.decrementAndGet();
+      }
+    }
+
+    public void containerFailedUpgrade() {
+      failed.set(true);
+    }
+
+    void reset() {
+      containersNeedUpgrade.set(0);
+      targetSpec = null;
+      targetVersion = null;
+      inProgress.set(false);
+      failed.set(false);
+    }
+
+    boolean areContainersUpgrading() {
+      return containersNeedUpgrade.get() != 0;
+    }
+  }
+
   public ServiceContext getContext() {
     return context;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 643961d..84caa77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
   private String upgradeVersion;
-  private boolean expressUpgrade;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.upgradeVersion = upgradeVersion;
     return this;
   }
-
-  public boolean isExpressUpgrade() {
-    return expressUpgrade;
-  }
-
-  public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
-    this.expressUpgrade = expressUpgrade;
-    return this;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 44d781f..d211f49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -24,6 +24,7 @@ public enum ComponentEventType {
   CONTAINER_RECOVERED,
   CONTAINER_STARTED,
   CONTAINER_COMPLETED,
+  CANCEL_UPGRADE,
   UPGRADE,
   CHECK_STABLE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
index 0f63d03..e1cd0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -22,5 +22,6 @@ public enum ComponentState {
   INIT,
   FLEXING,
   STABLE,
-  UPGRADING
+  UPGRADING,
+  CANCEL_UPGRADING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index afd8c67..89c9a22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       LoggerFactory.getLogger(ComponentInstance.class);
   private static final String FAILED_BEFORE_LAUNCH_DIAG =
       "failed before launch";
+  private static final String UPGRADE_FAILED = "upgrade failed";
 
-  private  StateMachine<ComponentInstanceState, ComponentInstanceEventType,
+  private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
       ComponentInstanceEvent> stateMachine;
   private Component component;
   private final ReadLock readLock;
@@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
   // This container object is used for rest API query
   private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
   private String serviceVersion;
-
+  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
+  private boolean pendingCancelUpgrade = false;
 
   private static final StateMachineFactory<ComponentInstance,
       ComponentInstanceState, ComponentInstanceEventType,
@@ -132,13 +137,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       .addTransition(READY, STARTED, BECOME_NOT_READY,
           new ContainerBecomeNotReadyTransition())
       .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
-      .addTransition(READY, UPGRADING, UPGRADE,
-          new ContainerUpgradeTransition())
-      .addTransition(UPGRADING, UPGRADING, UPGRADE,
-          new ContainerUpgradeTransition())
-      .addTransition(UPGRADING, READY, BECOME_READY,
-          new ContainerBecomeReadyTransition())
-      .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
+      .addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition())
+      .addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE,
+          new CancelUpgradeTransition())
+
+      // FROM UPGRADING
+      .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
+          CANCEL_UPGRADE, new CancelUpgradeTransition())
+      .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
+          new ReadyAfterUpgradeTransition())
+      .addTransition(UPGRADING, UPGRADING, STOP,
+          new StoppedAfterUpgradeTransition())
+
+      // FROM CANCEL_UPGRADING
+      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
+          BECOME_READY, new ReadyAfterUpgradeTransition())
+      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
+          STOP, new StoppedAfterCancelUpgradeTransition())
       .installTopology();
 
   public ComponentInstance(Component component,
@@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
-      compInstance.containerSpec.setState(ContainerState.READY);
-      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
-        compInstance.component.incContainersReady(false);
-        compInstance.component.decContainersThatNeedUpgrade();
-        compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
-            .getUpgradeVersion();
-        ComponentEvent checkState = new ComponentEvent(
-            compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
-        compInstance.scheduler.getDispatcher().getEventHandler().handle(
-            checkState);
+      compInstance.setContainerState(ContainerState.READY);
+      compInstance.component.incContainersReady(true);
+      compInstance.postContainerReady();
+    }
+  }
 
-      } else {
-        compInstance.component.incContainersReady(true);
-      }
-      if (compInstance.timelineServiceEnabled) {
-        compInstance.serviceTimelinePublisher
-            .componentInstanceBecomeReady(compInstance.containerSpec);
+  private static class ReadyAfterUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+
+      if (instance.pendingCancelUpgrade) {
+        // cancellation of upgrade was triggered before the upgrade was
+        // finished.
+        LOG.info("{} received ready but cancellation pending",
+            event.getContainerId());
+        instance.upgradeInProgress.set(true);
+        instance.cancelUpgrade();
+        instance.pendingCancelUpgrade = false;
+        return instance.getState();
       }
+
+      instance.upgradeInProgress.set(false);
+      instance.setContainerState(ContainerState.READY);
+      instance.component.incContainersReady(false);
+
+      Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
+          instance.component.getUpgradeStatus() :
+          instance.component.getCancelUpgradeStatus();
+      status.decContainersThatNeedUpgrade();
+
+      instance.serviceVersion = status.getTargetVersion();
+      ComponentEvent checkState = new ComponentEvent(
+          instance.component.getName(),
+          ComponentEventType.CHECK_STABLE);
+      instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
+      instance.postContainerReady();
+      return ComponentInstanceState.READY;
+    }
+  }
+
+  private void postContainerReady() {
+    if (timelineServiceEnabled) {
+      serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
     }
   }
 
@@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
-      compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
+      compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
       compInstance.component.decContainersReady(true);
     }
   }
@@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
           " completed. Reinsert back to pending list and requested ");
       builder.append("a new container.").append(System.lineSeparator());
       builder.append(" exitStatus=").append(
-          failureBeforeLaunch ? null : event.getStatus().getExitStatus());
+          failureBeforeLaunch || event.getStatus() == null ? null :
+              event.getStatus().getExitStatus());
       builder.append(", diagnostics=");
       builder.append(failureBeforeLaunch ?
           FAILED_BEFORE_LAUNCH_DIAG :
-          event.getStatus().getDiagnostics());
+          (event.getStatus() != null ? event.getStatus().getDiagnostics() :
+              UPGRADE_FAILED));
 
       if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
         LOG.error(builder.toString());
@@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         ComponentInstanceEvent event) {
 
       Component comp = compInstance.component;
+      ContainerStatus status = event.getStatus();
+      // status is not available when upgrade fails
       String containerDiag = compInstance.getCompInstanceId() + ": " + (
-          failedBeforeLaunching ?
-              FAILED_BEFORE_LAUNCH_DIAG :
-              event.getStatus().getDiagnostics());
+          failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG :
+              (status != null ? status.getDiagnostics() : UPGRADE_FAILED));
       compInstance.diagnostics.append(containerDiag + System.lineSeparator());
       compInstance.cancelContainerStatusRetriever();
-      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
-        compInstance.component.decContainersThatNeedUpgrade();
-      }
+
       if (compInstance.getState().equals(READY)) {
         compInstance.component.decContainersReady(true);
       }
@@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
           // record in ATS
           compInstance.scheduler.getServiceTimelinePublisher()
               .componentInstanceFinished(compInstance.getContainer().getId(),
-                  failedBeforeLaunching ?
-                      -1 :
-                      event.getStatus().getExitStatus(), ContainerState.FAILED,
-                  containerDiag);
+                  failedBeforeLaunching || status == null ? -1 :
+                      status.getExitStatus(),
+                  ContainerState.FAILED, containerDiag);
 
           // mark other component-instances/containers as STOPPED
           for (ContainerId containerId : scheduler.getLiveInstances()
@@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
         .equals(state) || ContainerState.SUCCEEDED.equals(state);
   }
 
-  private static class ContainerUpgradeTransition extends BaseTransition {
+  private static class StoppedAfterUpgradeTransition extends
+      BaseTransition {
 
     @Override
-    public void transition(ComponentInstance compInstance,
+    public void transition(ComponentInstance instance,
         ComponentInstanceEvent event) {
-      if (!compInstance.containerSpec.getState().equals(
-          ContainerState.NEEDS_UPGRADE)) {
-        //nothing to upgrade. this may happen with express upgrade.
+      instance.component.getUpgradeStatus().decContainersThatNeedUpgrade();
+      instance.component.decRunningContainers();
+
+      final ServiceScheduler scheduler = instance.component.getScheduler();
+      scheduler.getAmRMClient().releaseAssignedContainer(
+          event.getContainerId());
+      instance.scheduler.executorService.submit(
+          () -> instance.cleanupRegistry(event.getContainerId()));
+      scheduler.removeLiveCompInstance(event.getContainerId());
+      instance.component.getUpgradeStatus().containerFailedUpgrade();
+      instance.setContainerState(ContainerState.FAILED_UPGRADE);
+      instance.upgradeInProgress.set(false);
+    }
+  }
+
+  private static class StoppedAfterCancelUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    private ContainerStoppedTransition stoppedTransition =
+        new ContainerStoppedTransition();
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (instance.pendingCancelUpgrade) {
+        // cancellation of upgrade was triggered before the upgrade was
+        // finished.
+        LOG.info("{} received stopped but cancellation pending",
+            event.getContainerId());
+        instance.upgradeInProgress.set(true);
+        instance.cancelUpgrade();
+        instance.pendingCancelUpgrade = false;
+        return instance.getState();
+      }
+
+      // When upgrade is cancelled, and container re-init fails
+      instance.component.getCancelUpgradeStatus()
+          .decContainersThatNeedUpgrade();
+      instance.upgradeInProgress.set(false);
+      stoppedTransition.transition(instance, event);
+      return ComponentInstanceState.INIT;
+    }
+  }
+
+  private static class UpgradeTransition extends BaseTransition {
+
+    @Override
+    public void transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (!instance.component.getCancelUpgradeStatus().isCompleted()) {
+        // last check to see if cancellation was triggered. The component may
+        // have processed the cancel upgrade event but the instance doesn't know
+        // it yet. If cancellation has been triggered then no point in
+        // upgrading.
         return;
       }
-      compInstance.containerSpec.setState(ContainerState.UPGRADING);
-      compInstance.component.decContainersReady(false);
-      ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
-      compInstance.scheduler.getContainerLaunchService()
-          .reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
-              compInstance.container,
-              compInstance.component.createLaunchContext(
-                  upgradeEvent.getTargetSpec(),
-                  upgradeEvent.getUpgradeVersion()));
+      instance.upgradeInProgress.set(true);
+      instance.setContainerState(ContainerState.UPGRADING);
+      instance.component.decContainersReady(false);
+
+      Component.UpgradeStatus status = instance.component.getUpgradeStatus();
+      instance.scheduler.getContainerLaunchService()
+          .reInitCompInstance(instance.scheduler.getApp(), instance,
+              instance.container,
+              instance.component.createLaunchContext(
+                  status.getTargetSpec(),
+                  status.getTargetVersion()));
     }
   }
 
+  private static class CancelUpgradeTransition implements
+      MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+          ComponentInstanceState> {
+
+    @Override
+    public ComponentInstanceState transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (instance.upgradeInProgress.compareAndSet(false, true)) {
+
+        Component.UpgradeStatus cancelStatus = instance.component
+            .getCancelUpgradeStatus();
+
+        if (instance.getServiceVersion().equals(
+            cancelStatus.getTargetVersion())) {
+          // previous upgrade didn't happen so just go back to READY
+          LOG.info("{} nothing to cancel", event.getContainerId());
+          cancelStatus.decContainersThatNeedUpgrade();
+          instance.setContainerState(ContainerState.READY);
+          ComponentEvent checkState = new ComponentEvent(
+              instance.component.getName(), ComponentEventType.CHECK_STABLE);
+          instance.scheduler.getDispatcher().getEventHandler()
+              .handle(checkState);
+          return ComponentInstanceState.READY;
+        } else {
+          instance.component.decContainersReady(false);
+          instance.cancelUpgrade();
+        }
+      } else {
+        LOG.info("{} pending cancellation", event.getContainerId());
+        instance.pendingCancelUpgrade = true;
+      }
+      return ComponentInstanceState.CANCEL_UPGRADING;
+    }
+  }
+
+  private void cancelUpgrade() {
+    LOG.info("{} cancelling upgrade", container.getId());
+    setContainerState(ContainerState.UPGRADING);
+    Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
+    scheduler.getContainerLaunchService()
+        .reInitCompInstance(scheduler.getApp(), this,
+            this.container, this.component.createLaunchContext(
+                cancelStatus.getTargetSpec(),
+                cancelStatus.getTargetVersion()));
+  }
+
   public ComponentInstanceState getState() {
     this.readLock.lock();
 
@@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  /**
+   * Sets the state of the container in the container spec. It is write
+   * protected.
+   *
+   * @param state container state
+   */
+  public void setContainerState(ContainerState state) {
+    this.writeLock.lock();
+    try {
+      ContainerState curState = containerSpec.getState();
+      if (!curState.equals(state)) {
+        containerSpec.setState(state);
+        LOG.info("{} spec state state changed from {} -> {}",
+            getCompInstanceId(), curState, state);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   @Override
   public void handle(ComponentInstanceEvent event) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
index 665b8fa..b9181e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -23,5 +23,6 @@ public enum ComponentInstanceEventType {
   STOP,
   BECOME_READY,
   BECOME_NOT_READY,
-  UPGRADE
+  UPGRADE,
+  CANCEL_UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
index f5de5cb..28cbcf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
@@ -22,5 +22,6 @@ public enum ComponentInstanceState {
   INIT,
   STARTED,
   READY,
-  UPGRADING
+  UPGRADING,
+  CANCEL_UPGRADING
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org