You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/09/26 18:53:46 UTC
[1/2] hadoop git commit: YARN-8665. Added Yarn service cancel upgrade
option. Contributed by Chandni Singh
Repository: hadoop
Updated Branches:
refs/heads/trunk e0ff8e2c1 -> 913f87dad
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index 3c856ec..153ab46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
@@ -113,7 +112,8 @@ public class ContainerLaunchService extends AbstractService{
.startContainerAsync(container,
launcher.completeContainerLaunch());
} else {
- LOG.info("reInitializing container {}", container.getId());
+ LOG.info("reInitializing container {} with version {}",
+ container.getId(), componentLaunchContext.getServiceVersion());
instance.getComponent().getScheduler().getNmClient()
.reInitializeContainerAsync(container.getId(),
launcher.completeContainerLaunch(), true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
index 49ecd2e..6f37967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
@@ -30,6 +30,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -141,4 +143,15 @@ public class ClientAMProtocolPBClientImpl
}
return null;
}
+
+ @Override
+ public CancelUpgradeResponseProto cancelUpgrade(
+ CancelUpgradeRequestProto request) throws IOException, YarnException {
+ try {
+ return proxy.cancelUpgrade(null, request);
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
index eab3f9f..071c357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -116,4 +118,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
throw new ServiceException(e);
}
}
+
+ @Override
+ public CancelUpgradeResponseProto cancelUpgrade(
+ RpcController controller, CancelUpgradeRequestProto request)
+ throws ServiceException {
+ try {
+ return real.cancelUpgrade(request);
+ } catch (IOException | YarnException e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index ac90992..c12c340 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -167,9 +167,8 @@ public class ProviderUtils implements YarnServiceConstants {
public static Path initCompInstanceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
- Path compDir = new Path(new Path(fs.getAppDir(), "components"),
- compLaunchContext.getServiceVersion() + "/" +
- compLaunchContext.getName());
+ Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(),
+ compLaunchContext.getName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
@@ -184,7 +183,9 @@ public class ProviderUtils implements YarnServiceConstants {
ServiceContext context) throws IOException {
Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
- log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
+ log.info("{} version {} : Creating dir on hdfs: {}",
+ instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
+ compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index b588e88..0eb54ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -603,7 +603,7 @@ public class ServiceApiUtil {
public static void validateInstancesUpgrade(List<Container>
liveContainers) throws YarnException {
for (Container liveContainer : liveContainers) {
- if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
+ if (!isUpgradable(liveContainer)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
@@ -613,6 +613,16 @@ public class ServiceApiUtil {
}
/**
+ * Returns whether the container can be upgraded in the current state.
+ */
+ public static boolean isUpgradable(Container container) {
+
+ return container.getState() != null &&
+ (container.getState().equals(ContainerState.NEEDS_UPGRADE) ||
+ container.getState().equals(ContainerState.FAILED_UPGRADE));
+ }
+
+ /**
* Validates the components that are requested to upgrade require an upgrade.
* It returns the instances of the components which need upgrade.
*/
@@ -629,7 +639,7 @@ public class ServiceApiUtil {
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
}
liveComp.getContainers().forEach(liveContainer -> {
- if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
+ if (isUpgradable(liveContainer)) {
containerNeedUpgrade.add(liveContainer);
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
index d6d664e..c776476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -48,4 +50,51 @@ public class SliderFileSystem extends CoreFileSystem {
public Path getAppDir() {
return this.appDir;
}
+
+ /**
+ * Returns the component directory path.
+ *
+ * @param serviceVersion service version
+ * @param compName component name
+ * @return component directory
+ */
+ public Path getComponentDir(String serviceVersion, String compName) {
+ return new Path(new Path(getAppDir(), "components"),
+ serviceVersion + "/" + compName);
+ }
+
+ /**
+ * Deletes the component directory.
+ *
+ * @param serviceVersion
+ * @param compName
+ * @throws IOException
+ */
+ public void deleteComponentDir(String serviceVersion, String compName)
+ throws IOException {
+ Path path = getComponentDir(serviceVersion, compName);
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ LOG.debug("deleted dir {}", path);
+ }
+ }
+
+ /**
+ * Deletes the components version directory.
+ *
+ * @param serviceVersion
+ * @throws IOException
+ */
+ public void deleteComponentsVersionDirIfEmpty(String serviceVersion)
+ throws IOException {
+ Path path = new Path(new Path(getAppDir(), "components"), serviceVersion);
+ if (fileSystem.exists(path) && fileSystem.listStatus(path).length == 0) {
+ fileSystem.delete(path, true);
+ LOG.info("deleted dir {}", path);
+ }
+ }
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SliderFileSystem.class);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 169f765..bcf893e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -28,6 +28,8 @@ service ClientAMProtocolService {
rpc stop(StopRequestProto) returns (StopResponseProto);
rpc upgradeService(UpgradeServiceRequestProto)
returns (UpgradeServiceResponseProto);
+ rpc cancelUpgrade(CancelUpgradeRequestProto)
+ returns (CancelUpgradeResponseProto);
rpc restartService(RestartServiceRequestProto)
returns (RestartServiceResponseProto);
rpc upgrade(CompInstancesUpgradeRequestProto) returns
@@ -73,6 +75,12 @@ message UpgradeServiceResponseProto {
optional string error = 1;
}
+message CancelUpgradeRequestProto {
+}
+
+message CancelUpgradeResponseProto {
+}
+
message RestartServiceRequestProto {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
index 321b2cd..b685f4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
@@ -94,21 +94,25 @@ public class MockRunningServiceContext extends ServiceContext {
return mockLaunchService;
}
- @Override public ServiceUtils.ProcessTerminationHandler
- getTerminationHandler() {
+ @Override
+ public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return new
- ServiceUtils.ProcessTerminationHandler() {
- public void terminate(int exitCode) {
- }
- };
+ ServiceUtils.ProcessTerminationHandler() {
+ public void terminate(int exitCode) {
+ }
+ };
+ }
+
+ @Override
+ protected ServiceManager createServiceManager() {
+ return ServiceTestUtils.createServiceManager(
+ MockRunningServiceContext.this);
}
};
this.scheduler.init(fsWatcher.getConf());
- ServiceTestUtils.createServiceManager(this);
-
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(this);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 6b49ab0..58db752 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -384,6 +384,7 @@ public class ServiceTestUtils {
conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
try {
fs = new SliderFileSystem(conf);
+ fs.setAppDir(new Path(serviceBasePath.toString()));
} catch (IOException e) {
Throwables.propagate(e);
}
@@ -532,7 +533,6 @@ public class ServiceTestUtils {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
- System.out.println(retrievedApp);
return retrievedApp.getState() == desiredState;
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index a37cabe..406eea4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@@ -65,7 +66,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", false, false, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
- upgradeAllInstances(context);
+ upgradeAndReadyAllInstances(context);
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.START));
@@ -83,7 +84,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", false, true, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
- upgradeAllInstances(context);
+ upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
@@ -115,7 +116,7 @@ public class TestServiceManager {
manager.getServiceSpec().getState());
//make components stable by upgrading all instances
- upgradeAllInstances(context);
+ upgradeAndReadyAllInstances(context);
// finalize service
context.scheduler.getDispatcher().getEventHandler().handle(
@@ -138,7 +139,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", true, true, false);
// make components stable
- upgradeAllInstances(context);
+ upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
@@ -174,18 +175,17 @@ public class TestServiceManager {
public void testExpressUpgrade() throws Exception {
ServiceContext context = createServiceContext("testExpressUpgrade");
ServiceManager manager = context.getServiceManager();
- manager.getServiceSpec().setState(
- ServiceState.EXPRESS_UPGRADING);
+ manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING);
initUpgrade(context, "v2", true, true, true);
List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
- // wait till instances of first component are in upgrade
- String comp1 = comps.get(0);
- upgradeInstancesOf(context, comp1);
+ // wait till instances of first component are upgraded and ready
+ String compA = comps.get(0);
+ makeInstancesReadyAfterUpgrade(context, compA);
- // wait till instances of second component are in upgrade
- String comp2 = comps.get(1);
- upgradeInstancesOf(context, comp2);
+ // wait till instances of second component are upgraded and ready
+ String compB = comps.get(1);
+ makeInstancesReadyAfterUpgrade(context, compB);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
@@ -196,6 +196,57 @@ public class TestServiceManager {
validateUpgradeFinalization(manager.getName(), "v2");
}
+ @Test(timeout = TIMEOUT)
+ public void testCancelUpgrade() throws Exception {
+ ServiceContext context = createServiceContext("testCancelUpgrade");
+ writeInitialDef(context.service);
+ initUpgrade(context, "v2", true, false, false);
+ ServiceManager manager = context.getServiceManager();
+ Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+ manager.getServiceSpec().getState());
+
+ List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
+ // wait till instances of first component are upgraded and ready
+ String compA = comps.get(0);
+ // upgrade the instances
+ upgradeInstances(context, compA);
+ makeInstancesReadyAfterUpgrade(context, compA);
+
+ // cancel upgrade
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
+ makeInstancesReadyAfterUpgrade(context, compA);
+
+ GenericTestUtils.waitFor(()->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
+ Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
+ manager.getServiceSpec().getState());
+
+ validateUpgradeFinalization(manager.getName(), "v1");
+ }
+
+ @Test(timeout = TIMEOUT)
+ public void testCancelUpgradeAfterInitiate() throws Exception {
+ ServiceContext context = createServiceContext("testCancelUpgrade");
+ writeInitialDef(context.service);
+ initUpgrade(context, "v2", true, false, false);
+ ServiceManager manager = context.getServiceManager();
+ Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+ manager.getServiceSpec().getState());
+
+ // cancel upgrade
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
+ GenericTestUtils.waitFor(()->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
+ Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
+ manager.getServiceSpec().getState());
+
+ validateUpgradeFinalization(manager.getName(), "v1");
+ }
+
private void validateUpgradeFinalization(String serviceName,
String expectedVersion) throws IOException {
Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@@ -225,21 +276,23 @@ public class TestServiceManager {
}
writeUpgradedDef(upgradedDef);
serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
- ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
- upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
- .setAutoFinalize(autoFinalize);
-
- GenericTestUtils.waitFor(()-> {
- ServiceState serviceState = context.service.getState();
- if (serviceState.equals(ServiceState.UPGRADING) ||
- serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
- serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
- return true;
+ GenericTestUtils.waitFor(() -> {
+ for (Component comp : context.scheduler.getAllComponents().values()) {
+ if (!comp.getComponentSpec().getState().equals(
+ ComponentState.NEEDS_UPGRADE)) {
+ return false;
+ }
}
- return false;
+ return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
+ private void upgradeAndReadyAllInstances(ServiceContext context) throws
+ TimeoutException, InterruptedException {
+ upgradeAllInstances(context);
+ makeAllInstancesReady(context);
+ }
+
private void upgradeAllInstances(ServiceContext context) throws
TimeoutException, InterruptedException {
// upgrade the instances
@@ -248,8 +301,10 @@ public class TestServiceManager {
ComponentInstanceEventType.UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
}));
+ }
- // become ready
+ private void makeAllInstancesReady(ServiceContext context)
+ throws TimeoutException, InterruptedException {
context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
ComponentInstanceEventType.BECOME_READY);
@@ -267,7 +322,19 @@ public class TestServiceManager {
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
- private void upgradeInstancesOf(ServiceContext context, String compName)
+ private void upgradeInstances(ServiceContext context, String compName) {
+ Collection<ComponentInstance> compInstances = context.scheduler
+ .getAllComponents().get(compName).getAllComponentInstances();
+ compInstances.forEach(instance -> {
+ ComponentInstanceEvent event = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.UPGRADE);
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
+ });
+ }
+
+ private void makeInstancesReadyAfterUpgrade(ServiceContext context,
+ String compName)
throws TimeoutException, InterruptedException {
Collection<ComponentInstance> compInstances = context.scheduler
.getAllComponents().get(compName).getAllComponentInstances();
@@ -289,6 +356,15 @@ public class TestServiceManager {
context.scheduler.getDispatcher().getEventHandler().handle(event);
});
+
+ GenericTestUtils.waitFor(() -> {
+ for (ComponentInstance instance : compInstances) {
+ if (!instance.getContainerState().equals(ContainerState.READY)) {
+ return false;
+ }
+ }
+ return true;
+ }, CHECK_EVERY_MILLIS, TIMEOUT);
}
private ServiceContext createServiceContext(String name)
@@ -324,6 +400,14 @@ public class TestServiceManager {
return artifact;
}
+ private void writeInitialDef(Service service)
+ throws IOException, SliderException {
+ Path servicePath = rule.getFs().buildClusterDirPath(
+ service.getName());
+ ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath,
+ service);
+ }
+
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
@@ -332,6 +416,6 @@ public class TestServiceManager {
upgradedDef);
}
- private static final int TIMEOUT = 200000;
+ private static final int TIMEOUT = 10000;
private static final int CHECK_EVERY_MILLIS = 100;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 216d88f..3e23a10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
@@ -450,6 +451,49 @@ public class TestYarnNativeServices extends ServiceTestUtils {
client.actionDestroy(service.getName());
}
+ @Test(timeout = 200000)
+ public void testCancelUpgrade() throws Exception {
+ setupInternal(NUM_NMS);
+ getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
+ ServiceClient client = createClient(getConf());
+
+ Service service = createExampleApplication();
+ Component component = service.getComponents().iterator().next();
+ component.getConfiguration().getEnv().put("key1", "val0");
+
+ client.actionCreate(service);
+ waitForServiceToBeStable(client, service);
+
+ // upgrade the service
+ service.setState(ServiceState.UPGRADING);
+ service.setVersion("v2");
+ component.getConfiguration().getEnv().put("key1", "val1");
+ client.initiateUpgrade(service);
+
+ // wait for service to be in upgrade state
+ waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
+
+ // upgrade 1 container
+ Service liveService = client.getStatus(service.getName());
+ Container container = liveService.getComponent(component.getName())
+ .getContainers().iterator().next();
+ client.actionUpgrade(service, Lists.newArrayList(container));
+
+ Thread.sleep(500);
+ // cancel the upgrade
+ client.actionCancelUpgrade(service.getName());
+ waitForServiceToBeStable(client, service);
+ Service active = client.getStatus(service.getName());
+ Assert.assertEquals("component not stable", ComponentState.STABLE,
+ active.getComponent(component.getName()).getState());
+ Assert.assertEquals("comp does not have new env", "val0",
+ active.getComponent(component.getName()).getConfiguration()
+ .getEnv("key1"));
+ LOG.info("Stop/destroy service {}", service);
+ client.actionStop(service.getName(), true);
+ client.actionDestroy(service.getName());
+ }
+
// Test to verify ANTI_AFFINITY placement policy
// 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
// 2. Create an example service with 3 containers
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
index 0e047c2..41be8c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
@@ -221,6 +221,17 @@ public class TestServiceCLI {
Assert.assertEquals(result, 0);
}
+ @Test
+ public void testCancelUpgrade() throws Exception {
+ conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+ DummyServiceClient.class.getName());
+ cli.setConf(conf);
+ String[] args = {"app", "-upgrade", "app-1",
+ "-cancel", "-appTypes", DUMMY_APP_TYPE};
+ int result = cli.run(ApplicationCLI.preProcessArgs(args));
+ Assert.assertEquals(result, 0);
+ }
+
@Test (timeout = 180000)
public void testEnableFastLaunch() throws Exception {
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
@@ -332,5 +343,11 @@ public class TestServiceCLI {
throws IOException, YarnException {
return "";
}
+
+ @Override
+ public int actionCancelUpgrade(String appName) throws IOException,
+ YarnException {
+ return 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
index e1a4d9d..f11d871 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
@@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
-import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
@@ -38,9 +37,8 @@ import org.junit.Test;
import java.util.Iterator;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests for {@link Component}.
@@ -78,14 +76,14 @@ public class TestComponent {
"val1")).setUpgradeVersion("v2"));
// one instance finished upgrading
- comp.decContainersThatNeedUpgrade();
+ comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
// second instance finished upgrading
- comp.decContainersThatNeedUpgrade();
+ comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
@@ -97,7 +95,7 @@ public class TestComponent {
@Test
public void testContainerCompletedWhenUpgrading() throws Exception {
- String serviceName = "testContainerComplete";
+ String serviceName = "testContainerCompletedWhenUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
@@ -105,48 +103,233 @@ public class TestComponent {
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
- comp.getAllComponentInstances().forEach(instance -> {
- instance.handle(new ComponentInstanceEvent(
- instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
- });
- Iterator<ComponentInstance> instanceIter = comp.
- getAllComponentInstances().iterator();
+ comp.getAllComponentInstances().forEach(instance ->
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)));
// reinitialization of a container failed
- ContainerStatus status = mock(ContainerStatus.class);
- when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED);
- ComponentInstance instance = instanceIter.next();
+ for(ComponentInstance instance : comp.getAllComponentInstances()) {
+ ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
+ ComponentEventType.CONTAINER_COMPLETED)
+ .setInstance(instance)
+ .setContainerId(instance.getContainer().getId());
+ comp.handle(stopEvent);
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(), STOP));
+ }
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
+ Assert.assertEquals("component not in needs upgrade state",
+ ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+ }
+
+ @Test
+ public void testCancelUpgrade() throws Exception {
+ ServiceContext context = createTestContext(rule, "testCancelUpgrade");
+ Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+ .next().getValue();
+
+ ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
+ ComponentEventType.CANCEL_UPGRADE);
+ comp.handle(upgradeEvent);
+ Assert.assertEquals("component not in need upgrade state",
+ ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.service.component.ComponentState
+ .CANCEL_UPGRADING, comp.getState());
+ }
+
+ @Test
+ public void testContainerCompletedCancelUpgrade() throws Exception {
+ String serviceName = "testContainerCompletedCancelUpgrade";
+ MockRunningServiceContext context = createTestContext(rule, serviceName);
+ Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+ .next().getValue();
+
+ // upgrade completes
+ comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+ .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+ "val1")).setUpgradeVersion("v2"));
+ comp.getAllComponentInstances().forEach(instance ->
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.UPGRADE)));
+
+ // reinitialization of a container done
+ for(ComponentInstance instance : comp.getAllComponentInstances()) {
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(), BECOME_READY));
+ }
+
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CANCEL_UPGRADE)
+ .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+ "val0")).setUpgradeVersion("v1"));
+ comp.getAllComponentInstances().forEach(instance ->
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.CANCEL_UPGRADE)));
+
+ Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
+ .iterator();
+
+ // cancel upgrade failed of a container
+ ComponentInstance instance1 = iter.next();
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
- .setInstance(instance).setContainerId(instance.getContainer().getId())
- .setStatus(status);
+ .setInstance(instance1)
+ .setContainerId(instance1.getContainer().getId());
comp.handle(stopEvent);
- instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
- STOP).setStatus(status));
+ instance1.handle(new ComponentInstanceEvent(
+ instance1.getContainer().getId(), STOP));
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.service.component.ComponentState
+ .CANCEL_UPGRADING, comp.getState());
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
- Assert.assertEquals("component not in flexing state",
- ComponentState.FLEXING, comp.getComponentSpec().getState());
-
- // new container get allocated
- context.assignNewContainer(context.attemptId, 10, comp);
+ Assert.assertEquals("component not in needs upgrade state",
+ ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.service.component.ComponentState
+ .CANCEL_UPGRADING, comp.getState());
// second instance finished upgrading
- ComponentInstance instance2 = instanceIter.next();
+ ComponentInstance instance2 = iter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
+
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
+ Assert.assertEquals("component not in flexing state",
+ ComponentState.FLEXING, comp.getComponentSpec().getState());
+ // new container get allocated
+ context.assignNewContainer(context.attemptId, 10, comp);
+
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
- Assert.assertEquals("component did not upgrade successfully", "val1",
+ Assert.assertEquals("cancel upgrade failed", "val0",
+ comp.getComponentSpec().getConfiguration().getEnv("key1"));
+ }
+
+ @Test
+ public void testCancelUpgradeSuccessWhileUpgrading() throws Exception {
+ String serviceName = "testCancelUpgradeWhileUpgrading";
+ MockRunningServiceContext context = createTestContext(rule, serviceName);
+ Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+ .next().getValue();
+ cancelUpgradeWhileUpgrading(context, comp);
+
+ // cancel upgrade successful for both instances
+ for(ComponentInstance instance : comp.getAllComponentInstances()) {
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.BECOME_READY));
+ }
+
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
+ Assert.assertEquals("component not in stable state",
+ ComponentState.STABLE, comp.getComponentSpec().getState());
+ Assert.assertEquals("cancel upgrade failed", "val0",
+ comp.getComponentSpec().getConfiguration().getEnv("key1"));
+ }
+
+ @Test
+ public void testCancelUpgradeFailureWhileUpgrading() throws Exception {
+ String serviceName = "testCancelUpgradeFailureWhileUpgrading";
+ MockRunningServiceContext context = createTestContext(rule, serviceName);
+ Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+ .next().getValue();
+ cancelUpgradeWhileUpgrading(context, comp);
+
+ // cancel upgrade failed for both instances
+ for(ComponentInstance instance : comp.getAllComponentInstances()) {
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.STOP));
+ }
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
+ Assert.assertEquals("component not in flexing state",
+ ComponentState.FLEXING, comp.getComponentSpec().getState());
+
+ for (ComponentInstance instance : comp.getAllComponentInstances()) {
+ // new container get allocated
+ context.assignNewContainer(context.attemptId, 10, comp);
+ }
+
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
+ Assert.assertEquals("component not in stable state",
+ ComponentState.STABLE, comp.getComponentSpec().getState());
+ Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
+ private void cancelUpgradeWhileUpgrading(
+ MockRunningServiceContext context, Component comp)
+ throws Exception {
+
+ comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+ .setTargetSpec(createSpecWithEnv(context.service.getName(),
+ comp.getName(), "key1", "val1")).setUpgradeVersion("v0"));
+
+ Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
+ .iterator();
+
+ ComponentInstance instance1 = iter.next();
+
+ // instance1 is triggered to upgrade
+ instance1.handle(new ComponentInstanceEvent(
+ instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
+
+ // component upgrade is cancelled
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CANCEL_UPGRADE)
+ .setTargetSpec(createSpecWithEnv(context.service.getName(),
+ comp.getName(), "key1",
+ "val0")).setUpgradeVersion("v0"));
+
+ // all instances upgrade is cancelled.
+ comp.getAllComponentInstances().forEach(instance ->
+ instance.handle(new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.CANCEL_UPGRADE)));
+
+ // regular upgrade failed for instance 1
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1)
+ .setContainerId(instance1.getContainer().getId()));
+ instance1.handle(new ComponentInstanceEvent(
+ instance1.getContainer().getId(), STOP));
+
+ // component should be in cancel upgrade
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.service.component.ComponentState
+ .CANCEL_UPGRADING, comp.getState());
+
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CHECK_STABLE));
+
+ Assert.assertEquals("component not in needs upgrade state",
+ ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.service.component.ComponentState
+ .CANCEL_UPGRADING, comp.getState());
+ }
+
@Test
public void testComponentStateReachesStableStateWithTerminatingComponents()
throws
@@ -249,8 +432,6 @@ public class TestComponent {
serviceState);
}
-
-
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
index e039981..c5a9631 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.component.instance;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -86,7 +87,7 @@ public class TestComponentInstance {
@Test
public void testContainerReadyAfterUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
- "testContainerStarted");
+ "testContainerReadyAfterUpgrade");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
@@ -105,12 +106,186 @@ public class TestComponentInstance {
.getId().toString()).getState());
}
+
+ @Test
+ public void testContainerUpgradeFailed() throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testContainerUpgradeFailed");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ upgradeComponent(component);
+
+ ComponentInstance instance = component.getAllComponentInstances().iterator()
+ .next();
+
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+ instance.handle(upgradeEvent);
+
+ ContainerStatus containerStatus = mock(ContainerStatus.class);
+ when(containerStatus.getExitStatus()).thenReturn(
+ ContainerExitStatus.ABORTED);
+ ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.STOP)
+ .setStatus(containerStatus);
+ // this is the call back from NM for the upgrade
+ instance.handle(stopEvent);
+ Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+ }
+
+ @Test
+ public void testCancelNothingToUpgrade() throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testCancelUpgradeWhenContainerReady");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ cancelCompUpgrade(component);
+
+ ComponentInstance instance = component.getAllComponentInstances().iterator()
+ .next();
+
+ ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.CANCEL_UPGRADE);
+ instance.handle(cancelEvent);
+
+ Assert.assertEquals("instance not ready", ContainerState.READY,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+ }
+
+ @Test
+ public void testCancelUpgradeFailed() throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testCancelUpgradeFailed");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ cancelCompUpgrade(component);
+
+ ComponentInstance instance = component.getAllComponentInstances().iterator()
+ .next();
+
+ ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.CANCEL_UPGRADE);
+ instance.handle(cancelEvent);
+
+ instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+ ComponentInstanceEventType.STOP));
+ Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
+ instance.getState());
+ }
+
+ @Test
+ public void testCancelAfterCompProcessedCancel() throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testCancelAfterCompProcessedCancel");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ upgradeComponent(component);
+ cancelCompUpgrade(component);
+
+ ComponentInstance instance = component.getAllComponentInstances().iterator()
+ .next();
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+ instance.handle(upgradeEvent);
+
+ Assert.assertEquals("instance should start upgrading",
+ ContainerState.NEEDS_UPGRADE,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+ }
+
+ @Test
+ public void testCancelWhileUpgradeWithSuccess() throws Exception {
+ validateCancelWhileUpgrading(true, true);
+ }
+
+ @Test
+ public void testCancelWhileUpgradeWithFailure() throws Exception {
+ validateCancelWhileUpgrading(false, true);
+ }
+
+ @Test
+ public void testCancelFailedWhileUpgradeWithSuccess() throws Exception {
+ validateCancelWhileUpgrading(true, false);
+ }
+
+ @Test
+ public void testCancelFailedWhileUpgradeWithFailure() throws Exception {
+ validateCancelWhileUpgrading(false, false);
+ }
+
+ private void validateCancelWhileUpgrading(boolean upgradeSuccessful,
+ boolean cancelUpgradeSuccessful)
+ throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testCancelWhileUpgrading");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ upgradeComponent(component);
+
+ ComponentInstance instance = component.getAllComponentInstances().iterator()
+ .next();
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+ instance.handle(upgradeEvent);
+
+ Assert.assertEquals("instance should be upgrading",
+ ContainerState.UPGRADING,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+
+ cancelCompUpgrade(component);
+ ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.CANCEL_UPGRADE);
+ instance.handle(cancelEvent);
+
+ // either upgrade failed or successful
+ ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
+ ComponentInstanceEventType.STOP);
+
+ instance.handle(readyOrStopEvent);
+ Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+
+ // response for cancel received
+ ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
+ ComponentInstanceEventType.STOP);
+
+ instance.handle(readyOrStopCancel);
+ if (cancelUpgradeSuccessful) {
+ Assert.assertEquals("instance not ready", ContainerState.READY,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+ } else {
+ Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
+ instance.getState());
+ }
+ }
+
private void upgradeComponent(Component component) {
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec())
.setUpgradeVersion("v2"));
}
+ private void cancelCompUpgrade(Component component) {
+ component.handle(new ComponentEvent(component.getName(),
+ ComponentEventType.CANCEL_UPGRADE)
+ .setTargetSpec(component.getComponentSpec())
+ .setUpgradeVersion("v1"));
+ }
+
private Component createComponent(ServiceScheduler scheduler,
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy, int nSucceededInstances, int nFailedInstances,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index a0e4e02..b0e12bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -102,6 +102,7 @@ public class ApplicationCLI extends YarnCLI {
public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
public static final String UPGRADE_CMD = "upgrade";
public static final String UPGRADE_EXPRESS = "express";
+ public static final String UPGRADE_CANCEL = "cancel";
public static final String UPGRADE_INITIATE = "initiate";
public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
public static final String UPGRADE_FINALIZE = "finalize";
@@ -265,6 +266,8 @@ public class ApplicationCLI extends YarnCLI {
opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
"-initiate options to initiate the upgrade of the application with " +
"the ability to finalize the upgrade automatically.");
+ opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " +
+ "cancel current upgrade.");
opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
opts.getOption(LAUNCH_CMD).setArgs(2);
opts.getOption(START_CMD).setArgName("Application Name");
@@ -646,7 +649,7 @@ public class ApplicationCLI extends YarnCLI {
} else if (cliParser.hasOption(UPGRADE_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS,
UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE,
- COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
+ UPGRADE_CANCEL, COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
@@ -697,6 +700,13 @@ public class ApplicationCLI extends YarnCLI {
return exitCode;
}
return client.actionStart(appName);
+ } else if (cliParser.hasOption(UPGRADE_CANCEL)) {
+ if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
+ UPGRADE_CANCEL, APP_TYPE_CMD)) {
+ printUsage(title, opts);
+ return exitCode;
+ }
+ return client.actionCancelUpgrade(appName);
}
} else {
syserr.println("Invalid Command Usage : ");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index a600895..f795db5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -2143,6 +2143,8 @@ public class TestYarnCLI {
pw.println(" the upgrade of the application");
pw.println(" with the ability to finalize the");
pw.println(" upgrade automatically.");
+ pw.println(" -cancel Works with -upgrade option to");
+ pw.println(" cancel current upgrade.");
pw.println(" -changeQueue <Queue Name> Moves application to a new");
pw.println(" queue. ApplicationId can be");
pw.println(" passed using 'appId' option.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
index 232666d..df11ffd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
@@ -300,4 +300,17 @@ public abstract class AppAdminClient extends CompositeService {
@Unstable
public abstract int actionUpgradeExpress(String appName, File fileName)
throws IOException, YarnException;
+
+ /**
+ * Cancels the upgrade of the service.
+ *
+ * @param appName the name of the application
+ * @return exit code
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ @Unstable
+ public abstract int actionCancelUpgrade(String appName) throws IOException,
+ YarnException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 27a7c80..01d70af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -1836,6 +1836,7 @@ public class ContainerManagerImpl extends CompositeService implements
public void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
throws YarnException {
+ LOG.debug("{} requested reinit", containerId);
Container container = preReInitializeOrLocalizeCheck(containerId,
ReInitOp.RE_INIT);
ResourceSet resourceSet = new ResourceSet();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-8665. Added Yarn service cancel upgrade
option. Contributed by Chandni Singh
Posted by ey...@apache.org.
YARN-8665. Added Yarn service cancel upgrade option.
Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/913f87da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/913f87da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/913f87da
Branch: refs/heads/trunk
Commit: 913f87dada27776c539dfb352400ecf8d40e7943
Parents: e0ff8e2
Author: Eric Yang <ey...@apache.org>
Authored: Wed Sep 26 14:51:35 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Wed Sep 26 14:51:35 2018 -0400
----------------------------------------------------------------------
.../yarn/service/client/ApiServiceClient.java | 20 ++
.../hadoop/yarn/service/webapp/ApiServer.java | 34 +-
.../hadoop/yarn/service/ClientAMProtocol.java | 5 +
.../hadoop/yarn/service/ClientAMService.java | 12 +
.../hadoop/yarn/service/ServiceEvent.java | 14 +-
.../hadoop/yarn/service/ServiceEventType.java | 3 +-
.../hadoop/yarn/service/ServiceManager.java | 331 +++++++++++++------
.../service/api/records/ContainerState.java | 2 +-
.../yarn/service/api/records/ServiceState.java | 7 +-
.../yarn/service/client/ServiceClient.java | 21 ++
.../yarn/service/component/Component.java | 275 ++++++++++-----
.../yarn/service/component/ComponentEvent.java | 10 -
.../service/component/ComponentEventType.java | 1 +
.../yarn/service/component/ComponentState.java | 3 +-
.../component/instance/ComponentInstance.java | 269 ++++++++++++---
.../instance/ComponentInstanceEventType.java | 3 +-
.../instance/ComponentInstanceState.java | 3 +-
.../containerlaunch/ContainerLaunchService.java | 4 +-
.../pb/client/ClientAMProtocolPBClientImpl.java | 13 +
.../service/ClientAMProtocolPBServiceImpl.java | 13 +
.../yarn/service/provider/ProviderUtils.java | 9 +-
.../yarn/service/utils/ServiceApiUtil.java | 14 +-
.../yarn/service/utils/SliderFileSystem.java | 49 +++
.../src/main/proto/ClientAMProtocol.proto | 8 +
.../yarn/service/MockRunningServiceContext.java | 20 +-
.../hadoop/yarn/service/ServiceTestUtils.java | 2 +-
.../hadoop/yarn/service/TestServiceManager.java | 136 ++++++--
.../yarn/service/TestYarnNativeServices.java | 44 +++
.../yarn/service/client/TestServiceCLI.java | 17 +
.../yarn/service/component/TestComponent.java | 239 +++++++++++--
.../instance/TestComponentInstance.java | 177 +++++++++-
.../src/test/resources/log4j.properties | 19 ++
.../hadoop/yarn/client/cli/ApplicationCLI.java | 12 +-
.../hadoop/yarn/client/cli/TestYarnCLI.java | 2 +
.../hadoop/yarn/client/api/AppAdminClient.java | 13 +
.../containermanager/ContainerManagerImpl.java | 1 +
36 files changed, 1470 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index ca6cc50..b7a1541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient {
}
return null;
}
+
+ @Override
+ public int actionCancelUpgrade(
+ String appName) throws IOException, YarnException {
+ int result;
+ try {
+ Service service = new Service();
+ service.setName(appName);
+ service.setState(ServiceState.CANCEL_UPGRADING);
+ String buffer = jsonSerDeser.toJson(service);
+ LOG.info("Cancel upgrade in progress. Please wait..");
+ ClientResponse response = getApiClient(getServicePath(appName))
+ .put(ClientResponse.class, buffer);
+ result = processResponse(response);
+ } catch (Exception e) {
+ LOG.error("Failed to cancel upgrade: ", e);
+ result = EXIT_EXCEPTION_THROWN;
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index cd6f0d7..c4e3317 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -58,6 +58,7 @@ import java.util.*;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
+import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING;
import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
@@ -445,6 +446,12 @@ public class ApiServer {
return upgradeService(updateServiceData, ugi);
}
+ // If CANCEL_UPGRADING is requested
+ if (updateServiceData.getState() != null &&
+ updateServiceData.getState() == CANCEL_UPGRADING) {
+ return cancelUpgradeService(appName, ugi);
+ }
+
// If new lifetime value specified then update it
if (updateServiceData.getLifetime() != null
&& updateServiceData.getLifetime() > 0) {
@@ -460,8 +467,7 @@ public class ApiServer {
LOG.error(message, e);
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (YarnException e) {
- String message = "Service is not found in hdfs: " + appName;
- LOG.error(message, e);
+ LOG.error(e.getMessage(), e);
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (Exception e) {
String message = "Error while performing operation for app: " + appName;
@@ -707,6 +713,27 @@ public class ApiServer {
return formatResponse(Status.ACCEPTED, status);
}
+ private Response cancelUpgradeService(String serviceName,
+ final UserGroupInformation ugi) throws IOException, InterruptedException {
+ int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
+ ServiceClient sc = getServiceClient();
+ sc.init(YARN_CONFIG);
+ sc.start();
+ int exitCode = sc.actionCancelUpgrade(serviceName);
+ sc.close();
+ return exitCode;
+ });
+ if (result == EXIT_SUCCESS) {
+ ServiceStatus status = new ServiceStatus();
+ LOG.info("Service {} cancelling upgrade", serviceName);
+ status.setDiagnostics("Service " + serviceName +
+ " cancelling upgrade.");
+ status.setState(ServiceState.ACCEPTED);
+ return formatResponse(Status.ACCEPTED, status);
+ }
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+
private Response processComponentsUpgrade(UserGroupInformation ugi,
String serviceName, Set<String> compNames) throws YarnException,
IOException, InterruptedException {
@@ -734,7 +761,8 @@ public class ApiServer {
Service service, List<Container> containers) throws YarnException,
IOException, InterruptedException {
- if (service.getState() != ServiceState.UPGRADING) {
+ if (!service.getState().equals(ServiceState.UPGRADING) &&
+ !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
throw new YarnException(
String.format("The upgrade of service %s has not been initiated.",
service.getName()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
index 652a314..39e7dfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -60,4 +62,7 @@ public interface ClientAMProtocol {
GetCompInstancesResponseProto getCompInstances(
GetCompInstancesRequestProto request) throws IOException, YarnException;
+
+ CancelUpgradeResponseProto cancelUpgrade(
+ CancelUpgradeRequestProto request) throws IOException, YarnException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 2ef8f7e..47e9829 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
@@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
new Container[containers.size()]))).build();
}
+
+ @Override
+ public CancelUpgradeResponseProto cancelUpgrade(
+ CancelUpgradeRequestProto request) throws IOException, YarnException {
+ LOG.info("Cancel service upgrade by {}",
+ UserGroupInformation.getCurrentUser());
+ ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE);
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
+ return CancelUpgradeResponseProto.newBuilder().build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 3a55472..cf4455d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.service.api.records.Component;
-import java.util.Queue;
+import java.util.List;
/**
* Events are handled by {@link ServiceManager} to manage the service
@@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private String version;
private boolean autoFinalize;
private boolean expressUpgrade;
- private Queue<Component> compsToUpgradeInOrder;
+ // For express upgrade they should be in order.
+ private List<Component> compsToUpgrade;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
@@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
return this;
}
- public Queue<Component> getCompsToUpgradeInOrder() {
- return compsToUpgradeInOrder;
+ public List<Component> getCompsToUpgrade() {
+ return compsToUpgrade;
}
- public ServiceEvent setCompsToUpgradeInOrder(
- Queue<Component> compsToUpgradeInOrder) {
- this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+ public ServiceEvent setCompsToUpgrade(List<Component> compsToUpgrade) {
+ this.compsToUpgrade = compsToUpgrade;
return this;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
index 4fc420b..03afdd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
@@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service;
public enum ServiceEventType {
START,
UPGRADE,
- CHECK_STABLE
+ CHECK_STABLE,
+ CANCEL_UPGRADE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index 04454b1..4851325 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
-import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
-import org.apache.hadoop.yarn.state.MultipleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
-import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
private final SliderFileSystem fs;
private String upgradeVersion;
- private Queue<org.apache.hadoop.yarn.service.api.records
- .Component> compsToUpgradeInOrder;
+ private List<org.apache.hadoop.yarn.service.api.records
+ .Component> componentsToUpgrade;
+ private List<String> compsAffectedByUpgrade = new ArrayList<>();
+ private String cancelledVersion;
private static final StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.START,
- new CheckStableTransition())
+ new StartFromUpgradeTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.CHECK_STABLE,
new CheckStableTransition())
+
+ .addTransition(State.UPGRADING, State.UPGRADING,
+ ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition())
.installTopology();
public ServiceManager(ServiceContext context) {
@@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
serviceManager.upgradeVersion = event.getVersion();
+ serviceManager.componentsToUpgrade = event.getCompsToUpgrade();
+ event.getCompsToUpgrade().forEach(comp ->
+ serviceManager.compsAffectedByUpgrade.add(comp.getName()));
try {
if (event.isExpressUpgrade()) {
- serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
- serviceManager.compsToUpgradeInOrder = event
- .getCompsToUpgradeInOrder();
- serviceManager.upgradeNextCompIfAny();
+ serviceManager.dispatchNeedUpgradeEvents(false);
+ serviceManager.upgradeNextCompIfAny(false);
+ } else {
+ serviceManager.dispatchNeedUpgradeEvents(false);
+ }
+
+ if (event.isExpressUpgrade()) {
+ serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING);
} else if (event.isAutoFinalize()) {
- serviceManager.serviceSpec.setState(ServiceState
- .UPGRADING_AUTO_FINALIZE);
+ serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE);
} else {
- serviceManager.serviceSpec.setState(
- ServiceState.UPGRADING);
+ serviceManager.setServiceState(ServiceState.UPGRADING);
}
+
return State.UPGRADING;
} catch (Throwable e) {
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
if (currState.equals(ServiceState.STABLE)) {
return State.STABLE;
}
- if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
- org.apache.hadoop.yarn.service.api.records.Component component =
- serviceManager.compsToUpgradeInOrder.peek();
- if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
- !component.getState().equals(ComponentState.UPGRADING)) {
- serviceManager.compsToUpgradeInOrder.remove();
+ if (currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+ currState.equals(ServiceState.CANCEL_UPGRADING)) {
+
+ if (!serviceManager.componentsToUpgrade.isEmpty()) {
+ org.apache.hadoop.yarn.service.api.records.Component compSpec =
+ serviceManager.componentsToUpgrade.get(0);
+ Component component = serviceManager.scheduler.getAllComponents()
+ .get(compSpec.getName());
+
+ if (!component.isUpgrading()) {
+ serviceManager.componentsToUpgrade.remove(0);
+ serviceManager.upgradeNextCompIfAny(
+ currState.equals(ServiceState.CANCEL_UPGRADING));
+ }
}
- serviceManager.upgradeNextCompIfAny();
}
+
if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
- event.getType().equals(ServiceEventType.START) ||
- (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
- serviceManager.compsToUpgradeInOrder.isEmpty())) {
+ ((currState.equals(ServiceState.EXPRESS_UPGRADING) ||
+ currState.equals(ServiceState.CANCEL_UPGRADING)) &&
+ serviceManager.componentsToUpgrade.isEmpty())) {
+
ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
if (targetState.equals(ServiceState.STABLE)) {
- if (serviceManager.finalizeUpgrade()) {
- LOG.info("Service def state changed from {} -> {}", currState,
- serviceManager.serviceSpec.getState());
+ if (serviceManager.finalizeUpgrade(
+ currState.equals(ServiceState.CANCEL_UPGRADING))) {
return State.STABLE;
}
}
@@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
}
}
- private void upgradeNextCompIfAny() {
- if (!compsToUpgradeInOrder.isEmpty()) {
+ private static class StartFromUpgradeTransition implements
+ MultipleArcTransition<ServiceManager, ServiceEvent, State> {
+
+ @Override
+ public State transition(ServiceManager serviceManager, ServiceEvent event) {
+ ServiceState currState = serviceManager.serviceSpec.getState();
+
+ ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
+ if (targetState.equals(ServiceState.STABLE)) {
+ if (serviceManager.finalizeUpgrade(
+ currState.equals(ServiceState.CANCEL_UPGRADING))) {
+ return State.STABLE;
+ }
+ }
+ return State.UPGRADING;
+ }
+ }
+
+ private static class CancelUpgradeTransition implements
+ SingleArcTransition<ServiceManager, ServiceEvent> {
+
+ @Override
+ public void transition(ServiceManager serviceManager,
+ ServiceEvent event) {
+ if (!serviceManager.getState().equals(State.UPGRADING)) {
+ LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state",
+ serviceManager.getState());
+ return;
+ }
+ try {
+ Service targetSpec = ServiceApiUtil.loadService(
+ serviceManager.context.fs, serviceManager.getName());
+
+ Service sourceSpec = ServiceApiUtil.loadServiceUpgrade(
+ serviceManager.context.fs, serviceManager.getName(),
+ serviceManager.upgradeVersion);
+ serviceManager.cancelledVersion = serviceManager.upgradeVersion;
+ LOG.info("[SERVICE] cancel version {}",
+ serviceManager.cancelledVersion);
+ serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion();
+ serviceManager.componentsToUpgrade = serviceManager
+ .resolveCompsToUpgrade(sourceSpec, targetSpec);
+
+ serviceManager.compsAffectedByUpgrade.clear();
+ serviceManager.componentsToUpgrade.forEach(comp ->
+ serviceManager.compsAffectedByUpgrade.add(comp.getName()));
+
+ serviceManager.dispatchNeedUpgradeEvents(true);
+ serviceManager.upgradeNextCompIfAny(true);
+ serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING);
+ } catch (Throwable e) {
+ LOG.error("[SERVICE]: Cancellation of upgrade failed", e);
+ }
+ }
+ }
+
+ private void upgradeNextCompIfAny(boolean cancelUpgrade) {
+ if (!componentsToUpgrade.isEmpty()) {
org.apache.hadoop.yarn.service.api.records.Component component =
- compsToUpgradeInOrder.peek();
+ componentsToUpgrade.get(0);
+
+ serviceSpec.getComponent(component.getName()).getContainers().forEach(
+ container -> {
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ ContainerId.fromString(container.getId()),
+ !cancelUpgrade ? ComponentInstanceEventType.UPGRADE :
+ ComponentInstanceEventType.CANCEL_UPGRADE);
+ LOG.info("Upgrade container {} {}", container.getId(),
+ cancelUpgrade);
+ dispatcher.getEventHandler().handle(upgradeEvent);
+ });
+ }
+ }
- ComponentEvent needUpgradeEvent = new ComponentEvent(
- component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
- component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
- context.scheduler.getDispatcher().getEventHandler().handle(
- needUpgradeEvent);
+ private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) {
+ if (componentsToUpgrade != null) {
+ componentsToUpgrade.forEach(component -> {
+ ComponentEvent needUpgradeEvent = new ComponentEvent(
+ component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE :
+ ComponentEventType.CANCEL_UPGRADE)
+ .setTargetSpec(component)
+ .setUpgradeVersion(upgradeVersion);
+ LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade);
+ context.scheduler.getDispatcher().getEventHandler()
+ .handle(needUpgradeEvent);
+ });
}
}
/**
* @return whether finalization of upgrade was successful.
*/
- private boolean finalizeUpgrade() {
- try {
- // save the application id and state to
- Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
- fs, getName(), upgradeVersion);
- targetSpec.setId(serviceSpec.getId());
- targetSpec.setState(ServiceState.STABLE);
- Map<String, Component> allComps = scheduler.getAllComponents();
- targetSpec.getComponents().forEach(compSpec -> {
- Component comp = allComps.get(compSpec.getName());
- compSpec.setState(comp.getComponentSpec().getState());
- });
- jsonSerDeser.save(fs.getFileSystem(),
- ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
- fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
- } catch (IOException e) {
- LOG.error("Upgrade did not complete because unable to re-write the" +
- " service definition", e);
- return false;
+ private boolean finalizeUpgrade(boolean cancelUpgrade) {
+ if (!cancelUpgrade) {
+ try {
+ // save the application id and state to
+ Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+ fs, getName(), upgradeVersion);
+ targetSpec.setId(serviceSpec.getId());
+ targetSpec.setState(ServiceState.STABLE);
+ Map<String, Component> allComps = scheduler.getAllComponents();
+ targetSpec.getComponents().forEach(compSpec -> {
+ Component comp = allComps.get(compSpec.getName());
+ compSpec.setState(comp.getComponentSpec().getState());
+ });
+ jsonSerDeser.save(fs.getFileSystem(),
+ ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
+ } catch (IOException e) {
+ LOG.error("Upgrade did not complete because unable to re-write the" +
+ " service definition", e);
+ return false;
+ }
}
try {
- fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
+ String upgradeVersionToDel = cancelUpgrade? cancelledVersion :
+ upgradeVersion;
+ LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel);
+ fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel);
+
+ for (String comp : compsAffectedByUpgrade) {
+ String compDirVersionToDel = cancelUpgrade? cancelledVersion :
+ serviceSpec.getVersion();
+ LOG.info("[SERVICE]: delete {} dir version {}", comp,
+ compDirVersionToDel);
+ fs.deleteComponentDir(compDirVersionToDel, comp);
+ }
+
+ if (cancelUpgrade) {
+ fs.deleteComponentsVersionDirIfEmpty(cancelledVersion);
+ } else {
+ fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion());
+ }
+
} catch (IOException e) {
LOG.warn("Unable to delete upgrade definition for service {} " +
"version {}", getName(), upgradeVersion);
}
- serviceSpec.setState(ServiceState.STABLE);
+ setServiceState(ServiceState.STABLE);
serviceSpec.setVersion(upgradeVersion);
upgradeVersion = null;
+ cancelledVersion = null;
+ compsAffectedByUpgrade.clear();
return true;
}
@@ -291,30 +406,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
context.fs, context.service.getName(), upgradeVersion);
List<org.apache.hadoop.yarn.service.api.records.Component>
- compsNeedUpgradeList = componentsFinder.
- findTargetComponentSpecs(context.service, targetSpec);
-
- // remove all components from need upgrade list if there restart policy
- // doesn't all upgrade.
- if (compsNeedUpgradeList != null) {
- compsNeedUpgradeList.removeIf(component -> {
- org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
- restartPolicy = component.getRestartPolicy();
-
- final ComponentRestartPolicy restartPolicyHandler =
- Component.getRestartPolicyHandler(restartPolicy);
- // Do not allow upgrades for components which have NEVER/ON_FAILURE
- // restart policy
- if (!restartPolicyHandler.allowUpgrades()) {
- LOG.info("The component {} has a restart policy that doesnt " +
- "allow upgrades {} ", component.getName(),
- component.getRestartPolicy().toString());
- return true;
- }
-
- return false;
- });
- }
+ compsNeedUpgradeList = resolveCompsToUpgrade(context.service,
+ targetSpec);
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
.setVersion(upgradeVersion)
@@ -334,7 +427,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
List<String> resolvedComps = ServiceApiUtil
.resolveCompsDependency(targetSpec);
- Queue<org.apache.hadoop.yarn.service.api.records.Component>
+ List<org.apache.hadoop.yarn.service.api.records.Component>
orderedCompUpgrade = new LinkedList<>();
resolvedComps.forEach(compName -> {
org.apache.hadoop.yarn.service.api.records.Component component =
@@ -343,30 +436,68 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
orderedCompUpgrade.add(component);
}
});
- event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+ event.setCompsToUpgrade(orderedCompUpgrade);
+ } else {
+ event.setCompsToUpgrade(compsNeedUpgradeList);
}
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ event);
- context.scheduler.getDispatcher().getEventHandler().handle(event);
-
- if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
- if (!expressUpgrade) {
- compsNeedUpgradeList.forEach(component -> {
- ComponentEvent needUpgradeEvent = new ComponentEvent(
- component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
- component).setUpgradeVersion(event.getVersion());
- context.scheduler.getDispatcher().getEventHandler().handle(
- needUpgradeEvent);
-
- });
- }
- } else if (autoFinalize) {
- // nothing to upgrade if upgrade auto finalize is requested, trigger a
+ if (autoFinalize && (compsNeedUpgradeList == null ||
+ compsNeedUpgradeList.isEmpty())) {
+ // nothing to upgrade and auto finalize is requested, trigger a
// state check.
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CHECK_STABLE));
}
}
+ private List<org.apache.hadoop.yarn.service.api.records.Component>
+ resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) {
+
+ List<org.apache.hadoop.yarn.service.api.records.Component>
+ compsNeedUpgradeList = componentsFinder.
+ findTargetComponentSpecs(sourceSpec, targetSpec);
+
+ // remove all components from need upgrade list if there restart policy
+ // doesn't all upgrade.
+ if (compsNeedUpgradeList != null) {
+ compsNeedUpgradeList.removeIf(component -> {
+ org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
+ restartPolicy = component.getRestartPolicy();
+
+ final ComponentRestartPolicy restartPolicyHandler =
+ Component.getRestartPolicyHandler(restartPolicy);
+ // Do not allow upgrades for components which have NEVER/ON_FAILURE
+ // restart policy
+ if (!restartPolicyHandler.allowUpgrades()) {
+ LOG.info("The component {} has a restart policy that doesnt " +
+ "allow upgrades {} ", component.getName(),
+ component.getRestartPolicy().toString());
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ return compsNeedUpgradeList;
+ }
+
+ /**
+ * Sets the state of the service in the service spec.
+ * @param state service state
+ */
+ private void setServiceState(
+ org.apache.hadoop.yarn.service.api.records.ServiceState state) {
+ org.apache.hadoop.yarn.service.api.records.ServiceState curState =
+ serviceSpec.getState();
+ if (!curState.equals(state)) {
+ serviceSpec.setState(state);
+ LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state);
+ }
+ }
+
/**
* Returns the name of the service.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
index cac527a..a6e9a2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
@@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public enum ContainerState {
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
- FAILED;
+ FAILED, FAILED_UPGRADE;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 49c1985..3f2f4f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability;
@ApiModel(description = "The current state of an service.")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
- UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
+ UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING;
+
+ public static boolean isUpgrading(ServiceState state) {
+ return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE)
+ || state.equals(EXPRESS_UPGRADING);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index a27ed87..23db57e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@@ -353,6 +354,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
@Override
+ public int actionCancelUpgrade(String appName) throws IOException,
+ YarnException {
+ Service liveService = getStatus(appName);
+ if (liveService == null ||
+ !ServiceState.isUpgrading(liveService.getState())) {
+ throw new YarnException("Service " + appName + " is not upgrading, " +
+ "so nothing to cancel.");
+ }
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(
+ getAppId(appName));
+ if (StringUtils.isEmpty(appReport.getHost())) {
+ throw new YarnException(appName + " AM hostname is empty");
+ }
+ ClientAMProtocol proxy = createAMProxy(appName, appReport);
+ proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build());
+ return EXIT_SUCCESS;
+ }
+
+ @Override
public int actionCleanUp(String appName, String userName) throws
IOException, YarnException {
if (cleanUpRegistry(appName, userName)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index acf3404..526bde0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import static org.apache.hadoop.yarn.service.api.records.Component
@@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
@@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE;
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
@@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> {
new ConcurrentHashMap<>();
private boolean healthThresholdMonitorEnabled = false;
- private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
- private ComponentEvent upgradeEvent;
- private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
+ private UpgradeStatus upgradeStatus = new UpgradeStatus();
+ private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus();
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
stateMachine;
@@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> {
// Flex while previous flex is still in progress
.addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
new FlexComponentTransition())
+ .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+ CHECK_STABLE, new CheckStableTransition())
// container failed while stable
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
@@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> {
// For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
- .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
- new ComponentNeedsUpgradeTransition())
- //Upgrade while previous upgrade is still in progress
- .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
- new ComponentNeedsUpgradeTransition())
- .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
- CHECK_STABLE, new CheckStableTransition())
- .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
- CHECK_STABLE, new CheckStableTransition())
+ .addTransition(STABLE, UPGRADING, UPGRADE,
+ new NeedsUpgradeTransition())
+ .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
+ new NeedsUpgradeTransition())
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
new CheckStableTransition())
- .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
- new ContainerCompletedTransition())
+
+ // Cancel upgrade while previous upgrade is still in progress
+ .addTransition(UPGRADING, CANCEL_UPGRADING,
+ CANCEL_UPGRADE, new NeedsUpgradeTransition())
+ .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
+ CHECK_STABLE, new CheckStableTransition())
+ .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
+ new CompletedAfterUpgradeTransition())
+
+ .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
+ STABLE), CHECK_STABLE, new CheckStableTransition())
+ .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
+ CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
+ .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
+ new ContainerAllocatedTransition())
+
.installTopology();
public Component(
@@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> {
+ before + " to " + event.getDesired());
component.requestContainers(delta);
component.createNumCompInstances(delta);
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
return FLEXING;
@@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> {
if (component.getNumRunningInstances() + component
.getNumSucceededInstances() + component.getNumFailedInstances()
< component.getComponentSpec().getNumberOfContainers()) {
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
} else{
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
}
@@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> {
Component component) {
// if desired == running
if (component.componentMetrics.containersReady.value() == component
- .getComponentSpec().getNumberOfContainers()
- && component.numContainersThatNeedUpgrade.get() == 0) {
- component.componentSpec.setState(
+ .getComponentSpec().getNumberOfContainers() &&
+ !component.doesNeedUpgrade()) {
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
+ } else if (component.doesNeedUpgrade()) {
+ component.setComponentState(org.apache.hadoop.yarn.service.api.records.
+ ComponentState.NEEDS_UPGRADE);
+ return component.getState();
} else if (component.componentMetrics.containersReady.value() != component
.getComponentSpec().getNumberOfContainers()) {
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
- } else {
- // component.numContainersThatNeedUpgrade.get() > 0
- component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
- records.ComponentState.NEEDS_UPGRADE);
- return UPGRADING;
}
+ return component.getState();
}
// This method should be called whenever there is an increment or decrement
@@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> {
//This should not matter for terminating components
private static synchronized void checkAndUpdateComponentState(
Component component, boolean isIncrement) {
- org.apache.hadoop.yarn.service.api.records.ComponentState curState =
- component.componentSpec.getState();
if (component.getRestartPolicyHandler().isLongLived()) {
if (isIncrement) {
// check if all containers are in READY state
- if (component.numContainersThatNeedUpgrade.get() == 0
- && component.componentMetrics.containersReady.value()
- == component.componentMetrics.containersDesired.value()) {
- component.componentSpec.setState(
+ if (!component.upgradeStatus.areContainersUpgrading() &&
+ !component.cancelUpgradeStatus.areContainersUpgrading() &&
+ component.componentMetrics.containersReady.value() ==
+ component.componentMetrics.containersDesired.value()) {
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
- if (curState != component.componentSpec.getState()) {
- LOG.info("[COMPONENT {}] state changed from {} -> {}",
- component.componentSpec.getName(), curState,
- component.componentSpec.getState());
- }
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
@@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> {
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady.value()
< component.componentMetrics.containersDesired.value()) {
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState
.FLEXING);
} else if (component.componentMetrics.containersReady.value()
== component.componentMetrics.containersDesired.value()) {
- component.componentSpec.setState(
+ component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
}
- if (curState != component.componentSpec.getState()) {
- LOG.info("[COMPONENT {}] state changed from {} -> {}",
- component.componentSpec.getName(), curState,
- component.componentSpec.getState());
- }
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
@@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> {
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
- // when the service is stable then the state of component needs to
- // transition to stable
+ // triggers the state machine in component to reach appropriate state
+ // once the state in spec is changed.
component.dispatcher.getEventHandler().handle(
new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
@@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
- private static class ComponentNeedsUpgradeTransition extends BaseTransition {
+ private static class CompletedAfterUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
- component.upgradeInProgress.set(true);
- component.upgradeEvent = event;
- component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
- records.ComponentState.NEEDS_UPGRADE);
- component.numContainersThatNeedUpgrade.set(
+ Preconditions.checkNotNull(event.getContainerId());
+ component.updateMetrics(event.getStatus());
+ component.dispatcher.getEventHandler().handle(
+ new ComponentInstanceEvent(event.getContainerId(), STOP)
+ .setStatus(event.getStatus()));
+ }
+ }
+
+ private static class NeedsUpgradeTransition extends BaseTransition {
+ @Override
+ public void transition(Component component, ComponentEvent event) {
+ boolean isCancel = event.getType().equals(CANCEL_UPGRADE);
+ UpgradeStatus status = !isCancel ? component.upgradeStatus :
+ component.cancelUpgradeStatus;
+
+ status.inProgress.set(true);
+ status.targetSpec = event.getTargetSpec();
+ status.targetVersion = event.getUpgradeVersion();
+ LOG.info("[COMPONENT {}]: need upgrade to {}",
+ component.getName(), status.targetVersion);
+
+ status.containersNeedUpgrade.set(
component.componentSpec.getNumberOfContainers());
- component.componentSpec.getContainers().forEach(container -> {
- container.setState(ContainerState.NEEDS_UPGRADE);
- if (event.isExpressUpgrade()) {
- ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
- ContainerId.fromString(container.getId()),
- ComponentInstanceEventType.UPGRADE);
- LOG.info("Upgrade container {}", container.getId());
- component.dispatcher.getEventHandler().handle(upgradeEvent);
- }
+
+ component.setComponentState(org.apache.hadoop.yarn.service.api.
+ records.ComponentState.NEEDS_UPGRADE);
+
+ component.getAllComponentInstances().forEach(instance -> {
+ instance.setContainerState(ContainerState.NEEDS_UPGRADE);
});
+
+ if (event.getType().equals(CANCEL_UPGRADE)) {
+ component.upgradeStatus.reset();
+ }
}
}
@@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public ComponentState transition(Component component,
ComponentEvent componentEvent) {
- org.apache.hadoop.yarn.service.api.records.ComponentState currState =
- component.componentSpec.getState();
- if (currState.equals(org.apache.hadoop.yarn.service.api.records
- .ComponentState.STABLE)) {
- return ComponentState.STABLE;
- }
// checkIfStable also updates the state in definition when STABLE
ComponentState targetState = checkIfStable(component);
- if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
- component.componentSpec.overwrite(
- component.upgradeEvent.getTargetSpec());
- component.upgradeEvent = null;
+
+ if (targetState.equals(STABLE) &&
+ !(component.upgradeStatus.isCompleted() &&
+ component.cancelUpgradeStatus.isCompleted())) {
+ // Component stable after upgrade or cancel upgrade
+ UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ?
+ component.cancelUpgradeStatus : component.upgradeStatus;
+
+ component.componentSpec.overwrite(status.getTargetSpec());
+ status.reset();
+
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
CHECK_STABLE);
component.dispatcher.getEventHandler().handle(checkStable);
- component.upgradeInProgress.set(false);
}
return targetState;
}
@@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> {
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
getName(), container.getId(), instance.getCompInstanceName(),
container.getNodeId());
- if (upgradeInProgress.get()) {
+ if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
+ UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
+ cancelUpgradeStatus : upgradeStatus;
+
scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container,
- createLaunchContext(upgradeEvent.getTargetSpec(),
- upgradeEvent.getUpgradeVersion()));
+ createLaunchContext(status.getTargetSpec(),
+ status.getTargetVersion()));
} else {
scheduler.getContainerLaunchService().launchCompInstance(
scheduler.getApp(), instance, container,
@@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
+ private boolean doesNeedUpgrade() {
+ return cancelUpgradeStatus.areContainersUpgrading() ||
+ upgradeStatus.areContainersUpgrading() ||
+ upgradeStatus.failed.get();
+ }
+
public boolean areDependenciesReady() {
List<String> dependencies = componentSpec.getDependencies();
if (ServiceUtils.isEmpty(dependencies)) {
@@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
- public void decContainersThatNeedUpgrade() {
- numContainersThatNeedUpgrade.decrementAndGet();
- }
-
public int getNumReadyInstances() {
return componentMetrics.containersReady.value();
}
@@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
- public ComponentEvent getUpgradeEvent() {
+ /**
+ * Returns whether a component is upgrading or not.
+ */
+ public boolean isUpgrading() {
+ this.readLock.lock();
+
+ try {
+ return !(upgradeStatus.isCompleted() &&
+ cancelUpgradeStatus.isCompleted());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public UpgradeStatus getUpgradeStatus() {
+ this.readLock.lock();
+ try {
+ return upgradeStatus;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public UpgradeStatus getCancelUpgradeStatus() {
this.readLock.lock();
try {
- return upgradeEvent;
+ return cancelUpgradeStatus;
} finally {
this.readLock.unlock();
}
@@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
+ /**
+ * Sets the state of the component in the component spec.
+ * @param state component state
+ */
+ private void setComponentState(
+ org.apache.hadoop.yarn.service.api.records.ComponentState state) {
+ org.apache.hadoop.yarn.service.api.records.ComponentState curState =
+ componentSpec.getState();
+ if (!curState.equals(state)) {
+ componentSpec.setState(state);
+ LOG.info("[COMPONENT {}] spec state changed from {} -> {}",
+ componentSpec.getName(), curState, state);
+ }
+ }
+
+ /**
+ * Status of upgrade.
+ */
+ public static class UpgradeStatus {
+ private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
+ private String targetVersion;
+ private AtomicBoolean inProgress = new AtomicBoolean(false);
+ private AtomicLong containersNeedUpgrade = new AtomicLong(0);
+ private AtomicBoolean failed = new AtomicBoolean(false);
+
+ public org.apache.hadoop.yarn.service.api.records.
+ Component getTargetSpec() {
+ return targetSpec;
+ }
+
+ public String getTargetVersion() {
+ return targetVersion;
+ }
+
+ /*
+ * @return whether the upgrade is completed or not
+ */
+ public boolean isCompleted() {
+ return !inProgress.get();
+ }
+
+ public void decContainersThatNeedUpgrade() {
+ if (inProgress.get()) {
+ containersNeedUpgrade.decrementAndGet();
+ }
+ }
+
+ public void containerFailedUpgrade() {
+ failed.set(true);
+ }
+
+ void reset() {
+ containersNeedUpgrade.set(0);
+ targetSpec = null;
+ targetVersion = null;
+ inProgress.set(false);
+ failed.set(false);
+ }
+
+ boolean areContainersUpgrading() {
+ return containersNeedUpgrade.get() != 0;
+ }
+ }
+
public ServiceContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 643961d..84caa77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
private String upgradeVersion;
- private boolean expressUpgrade;
public ContainerId getContainerId() {
return containerId;
@@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
this.upgradeVersion = upgradeVersion;
return this;
}
-
- public boolean isExpressUpgrade() {
- return expressUpgrade;
- }
-
- public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
- this.expressUpgrade = expressUpgrade;
- return this;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 44d781f..d211f49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -24,6 +24,7 @@ public enum ComponentEventType {
CONTAINER_RECOVERED,
CONTAINER_STARTED,
CONTAINER_COMPLETED,
+ CANCEL_UPGRADE,
UPGRADE,
CHECK_STABLE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
index 0f63d03..e1cd0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -22,5 +22,6 @@ public enum ComponentState {
INIT,
FLEXING,
STABLE,
- UPGRADING
+ UPGRADING,
+ CANCEL_UPGRADING
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index afd8c67..89c9a22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
+import java.util.EnumSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
LoggerFactory.getLogger(ComponentInstance.class);
private static final String FAILED_BEFORE_LAUNCH_DIAG =
"failed before launch";
+ private static final String UPGRADE_FAILED = "upgrade failed";
- private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
+ private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
ComponentInstanceEvent> stateMachine;
private Component component;
private final ReadLock readLock;
@@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// This container object is used for rest API query
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
private String serviceVersion;
-
+ private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
+ private boolean pendingCancelUpgrade = false;
private static final StateMachineFactory<ComponentInstance,
ComponentInstanceState, ComponentInstanceEventType,
@@ -132,13 +137,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.addTransition(READY, STARTED, BECOME_NOT_READY,
new ContainerBecomeNotReadyTransition())
.addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
- .addTransition(READY, UPGRADING, UPGRADE,
- new ContainerUpgradeTransition())
- .addTransition(UPGRADING, UPGRADING, UPGRADE,
- new ContainerUpgradeTransition())
- .addTransition(UPGRADING, READY, BECOME_READY,
- new ContainerBecomeReadyTransition())
- .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
+ .addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition())
+ .addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE,
+ new CancelUpgradeTransition())
+
+ // FROM UPGRADING
+ .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
+ CANCEL_UPGRADE, new CancelUpgradeTransition())
+ .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
+ new ReadyAfterUpgradeTransition())
+ .addTransition(UPGRADING, UPGRADING, STOP,
+ new StoppedAfterUpgradeTransition())
+
+ // FROM CANCEL_UPGRADING
+ .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
+ BECOME_READY, new ReadyAfterUpgradeTransition())
+ .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
+ STOP, new StoppedAfterCancelUpgradeTransition())
.installTopology();
public ComponentInstance(Component component,
@@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
- compInstance.containerSpec.setState(ContainerState.READY);
- if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
- compInstance.component.incContainersReady(false);
- compInstance.component.decContainersThatNeedUpgrade();
- compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
- .getUpgradeVersion();
- ComponentEvent checkState = new ComponentEvent(
- compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
- compInstance.scheduler.getDispatcher().getEventHandler().handle(
- checkState);
+ compInstance.setContainerState(ContainerState.READY);
+ compInstance.component.incContainersReady(true);
+ compInstance.postContainerReady();
+ }
+ }
- } else {
- compInstance.component.incContainersReady(true);
- }
- if (compInstance.timelineServiceEnabled) {
- compInstance.serviceTimelinePublisher
- .componentInstanceBecomeReady(compInstance.containerSpec);
+ private static class ReadyAfterUpgradeTransition implements
+ MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+ ComponentInstanceState> {
+
+ @Override
+ public ComponentInstanceState transition(ComponentInstance instance,
+ ComponentInstanceEvent event) {
+
+ if (instance.pendingCancelUpgrade) {
+ // cancellation of upgrade was triggered before the upgrade was
+ // finished.
+ LOG.info("{} received ready but cancellation pending",
+ event.getContainerId());
+ instance.upgradeInProgress.set(true);
+ instance.cancelUpgrade();
+ instance.pendingCancelUpgrade = false;
+ return instance.getState();
}
+
+ instance.upgradeInProgress.set(false);
+ instance.setContainerState(ContainerState.READY);
+ instance.component.incContainersReady(false);
+
+ Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
+ instance.component.getUpgradeStatus() :
+ instance.component.getCancelUpgradeStatus();
+ status.decContainersThatNeedUpgrade();
+
+ instance.serviceVersion = status.getTargetVersion();
+ ComponentEvent checkState = new ComponentEvent(
+ instance.component.getName(),
+ ComponentEventType.CHECK_STABLE);
+ instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
+ instance.postContainerReady();
+ return ComponentInstanceState.READY;
+ }
+ }
+
+ private void postContainerReady() {
+ if (timelineServiceEnabled) {
+ serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
}
}
@@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
- compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
+ compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.component.decContainersReady(true);
}
}
@@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
" completed. Reinsert back to pending list and requested ");
builder.append("a new container.").append(System.lineSeparator());
builder.append(" exitStatus=").append(
- failureBeforeLaunch ? null : event.getStatus().getExitStatus());
+ failureBeforeLaunch || event.getStatus() == null ? null :
+ event.getStatus().getExitStatus());
builder.append(", diagnostics=");
builder.append(failureBeforeLaunch ?
FAILED_BEFORE_LAUNCH_DIAG :
- event.getStatus().getDiagnostics());
+ (event.getStatus() != null ? event.getStatus().getDiagnostics() :
+ UPGRADE_FAILED));
if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
LOG.error(builder.toString());
@@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
ComponentInstanceEvent event) {
Component comp = compInstance.component;
+ ContainerStatus status = event.getStatus();
+ // status is not available when upgrade fails
String containerDiag = compInstance.getCompInstanceId() + ": " + (
- failedBeforeLaunching ?
- FAILED_BEFORE_LAUNCH_DIAG :
- event.getStatus().getDiagnostics());
+ failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG :
+ (status != null ? status.getDiagnostics() : UPGRADE_FAILED));
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
- if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
- compInstance.component.decContainersThatNeedUpgrade();
- }
+
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady(true);
}
@@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// record in ATS
compInstance.scheduler.getServiceTimelinePublisher()
.componentInstanceFinished(compInstance.getContainer().getId(),
- failedBeforeLaunching ?
- -1 :
- event.getStatus().getExitStatus(), ContainerState.FAILED,
- containerDiag);
+ failedBeforeLaunching || status == null ? -1 :
+ status.getExitStatus(),
+ ContainerState.FAILED, containerDiag);
// mark other component-instances/containers as STOPPED
for (ContainerId containerId : scheduler.getLiveInstances()
@@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.equals(state) || ContainerState.SUCCEEDED.equals(state);
}
- private static class ContainerUpgradeTransition extends BaseTransition {
+ private static class StoppedAfterUpgradeTransition extends
+ BaseTransition {
@Override
- public void transition(ComponentInstance compInstance,
+ public void transition(ComponentInstance instance,
ComponentInstanceEvent event) {
- if (!compInstance.containerSpec.getState().equals(
- ContainerState.NEEDS_UPGRADE)) {
- //nothing to upgrade. this may happen with express upgrade.
+ instance.component.getUpgradeStatus().decContainersThatNeedUpgrade();
+ instance.component.decRunningContainers();
+
+ final ServiceScheduler scheduler = instance.component.getScheduler();
+ scheduler.getAmRMClient().releaseAssignedContainer(
+ event.getContainerId());
+ instance.scheduler.executorService.submit(
+ () -> instance.cleanupRegistry(event.getContainerId()));
+ scheduler.removeLiveCompInstance(event.getContainerId());
+ instance.component.getUpgradeStatus().containerFailedUpgrade();
+ instance.setContainerState(ContainerState.FAILED_UPGRADE);
+ instance.upgradeInProgress.set(false);
+ }
+ }
+
+ private static class StoppedAfterCancelUpgradeTransition implements
+ MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+ ComponentInstanceState> {
+
+ private ContainerStoppedTransition stoppedTransition =
+ new ContainerStoppedTransition();
+
+ @Override
+ public ComponentInstanceState transition(ComponentInstance instance,
+ ComponentInstanceEvent event) {
+ if (instance.pendingCancelUpgrade) {
+ // cancellation of upgrade was triggered before the upgrade was
+ // finished.
+ LOG.info("{} received stopped but cancellation pending",
+ event.getContainerId());
+ instance.upgradeInProgress.set(true);
+ instance.cancelUpgrade();
+ instance.pendingCancelUpgrade = false;
+ return instance.getState();
+ }
+
+ // When upgrade is cancelled, and container re-init fails
+ instance.component.getCancelUpgradeStatus()
+ .decContainersThatNeedUpgrade();
+ instance.upgradeInProgress.set(false);
+ stoppedTransition.transition(instance, event);
+ return ComponentInstanceState.INIT;
+ }
+ }
+
+ private static class UpgradeTransition extends BaseTransition {
+
+ @Override
+ public void transition(ComponentInstance instance,
+ ComponentInstanceEvent event) {
+ if (!instance.component.getCancelUpgradeStatus().isCompleted()) {
+ // last check to see if cancellation was triggered. The component may
+ // have processed the cancel upgrade event but the instance doesn't know
+ // it yet. If cancellation has been triggered then no point in
+ // upgrading.
return;
}
- compInstance.containerSpec.setState(ContainerState.UPGRADING);
- compInstance.component.decContainersReady(false);
- ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
- compInstance.scheduler.getContainerLaunchService()
- .reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
- compInstance.container,
- compInstance.component.createLaunchContext(
- upgradeEvent.getTargetSpec(),
- upgradeEvent.getUpgradeVersion()));
+ instance.upgradeInProgress.set(true);
+ instance.setContainerState(ContainerState.UPGRADING);
+ instance.component.decContainersReady(false);
+
+ Component.UpgradeStatus status = instance.component.getUpgradeStatus();
+ instance.scheduler.getContainerLaunchService()
+ .reInitCompInstance(instance.scheduler.getApp(), instance,
+ instance.container,
+ instance.component.createLaunchContext(
+ status.getTargetSpec(),
+ status.getTargetVersion()));
}
}
+ private static class CancelUpgradeTransition implements
+ MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
+ ComponentInstanceState> {
+
+ @Override
+ public ComponentInstanceState transition(ComponentInstance instance,
+ ComponentInstanceEvent event) {
+ if (instance.upgradeInProgress.compareAndSet(false, true)) {
+
+ Component.UpgradeStatus cancelStatus = instance.component
+ .getCancelUpgradeStatus();
+
+ if (instance.getServiceVersion().equals(
+ cancelStatus.getTargetVersion())) {
+ // previous upgrade didn't happen so just go back to READY
+ LOG.info("{} nothing to cancel", event.getContainerId());
+ cancelStatus.decContainersThatNeedUpgrade();
+ instance.setContainerState(ContainerState.READY);
+ ComponentEvent checkState = new ComponentEvent(
+ instance.component.getName(), ComponentEventType.CHECK_STABLE);
+ instance.scheduler.getDispatcher().getEventHandler()
+ .handle(checkState);
+ return ComponentInstanceState.READY;
+ } else {
+ instance.component.decContainersReady(false);
+ instance.cancelUpgrade();
+ }
+ } else {
+ LOG.info("{} pending cancellation", event.getContainerId());
+ instance.pendingCancelUpgrade = true;
+ }
+ return ComponentInstanceState.CANCEL_UPGRADING;
+ }
+ }
+
+ private void cancelUpgrade() {
+ LOG.info("{} cancelling upgrade", container.getId());
+ setContainerState(ContainerState.UPGRADING);
+ Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
+ scheduler.getContainerLaunchService()
+ .reInitCompInstance(scheduler.getApp(), this,
+ this.container, this.component.createLaunchContext(
+ cancelStatus.getTargetSpec(),
+ cancelStatus.getTargetVersion()));
+ }
+
public ComponentInstanceState getState() {
this.readLock.lock();
@@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
+ /**
+ * Sets the state of the container in the container spec. It is write
+ * protected.
+ *
+ * @param state container state
+ */
+ public void setContainerState(ContainerState state) {
+ this.writeLock.lock();
+ try {
+ ContainerState curState = containerSpec.getState();
+ if (!curState.equals(state)) {
+ containerSpec.setState(state);
+ LOG.info("{} spec state state changed from {} -> {}",
+ getCompInstanceId(), curState, state);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
@Override
public void handle(ComponentInstanceEvent event) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
index 665b8fa..b9181e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -23,5 +23,6 @@ public enum ComponentInstanceEventType {
STOP,
BECOME_READY,
BECOME_NOT_READY,
- UPGRADE
+ UPGRADE,
+ CANCEL_UPGRADE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
index f5de5cb..28cbcf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
@@ -22,5 +22,6 @@ public enum ComponentInstanceState {
INIT,
STARTED,
READY,
- UPGRADING
+ UPGRADING,
+ CANCEL_UPGRADING
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org