You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:28 UTC
[11/50] [abbrv] hadoop git commit: YARN-8298. Added express upgrade
for YARN service. Contributed by Chandni Singh
YARN-8298. Added express upgrade for YARN service.
Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e557c6bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e557c6bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e557c6bd
Branch: refs/heads/YARN-3409
Commit: e557c6bd8de2811a561210f672f47b4d07a9d5c6
Parents: 9c3fc3e
Author: Eric Yang <ey...@apache.org>
Authored: Tue Aug 21 19:49:26 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Tue Aug 21 19:49:26 2018 -0400
----------------------------------------------------------------------
.../yarn/service/client/ApiServiceClient.java | 20 +
.../hadoop/yarn/service/webapp/ApiServer.java | 12 +-
.../hadoop/yarn/service/ClientAMService.java | 2 +-
.../hadoop/yarn/service/ServiceEvent.java | 25 +
.../hadoop/yarn/service/ServiceManager.java | 127 +++-
.../hadoop/yarn/service/ServiceScheduler.java | 15 +-
.../yarn/service/api/records/ServiceState.java | 2 +-
.../yarn/service/client/ServiceClient.java | 100 ++-
.../yarn/service/component/Component.java | 16 +-
.../yarn/service/component/ComponentEvent.java | 10 +
.../component/instance/ComponentInstance.java | 5 +
.../yarn/service/utils/ServiceApiUtil.java | 44 ++
.../src/main/proto/ClientAMProtocol.proto | 1 +
.../hadoop/yarn/service/TestServiceApiUtil.java | 653 ----------------
.../hadoop/yarn/service/TestServiceManager.java | 299 +++++---
.../yarn/service/TestYarnNativeServices.java | 35 +
.../yarn/service/utils/TestServiceApiUtil.java | 743 +++++++++++++++++++
.../hadoop/yarn/client/cli/ApplicationCLI.java | 20 +-
.../hadoop/yarn/client/cli/TestYarnCLI.java | 4 +
.../hadoop/yarn/client/api/AppAdminClient.java | 12 +
20 files changed, 1308 insertions(+), 837 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index 9229446..ca6cc50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -601,6 +601,26 @@ public class ApiServiceClient extends AppAdminClient {
}
@Override
+ public int actionUpgradeExpress(String appName, File path)
+ throws IOException, YarnException {
+ int result;
+ try {
+ Service service =
+ loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+ service.setState(ServiceState.EXPRESS_UPGRADING);
+ String buffer = jsonSerDeser.toJson(service);
+ LOG.info("Upgrade in progress. Please wait..");
+ ClientResponse response = getApiClient(getServicePath(appName))
+ .put(ClientResponse.class, buffer);
+ result = processResponse(response);
+ } catch (Exception e) {
+ LOG.error("Failed to upgrade application: ", e);
+ result = EXIT_EXCEPTION_THROWN;
+ }
+ return result;
+ }
+
+ @Override
public int initiateUpgrade(String appName,
String fileName, boolean autoFinalize) throws IOException, YarnException {
int result;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 4db0ac8..cd6f0d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -440,7 +440,8 @@ public class ApiServer {
if (updateServiceData.getState() != null && (
updateServiceData.getState() == ServiceState.UPGRADING ||
updateServiceData.getState() ==
- ServiceState.UPGRADING_AUTO_FINALIZE)) {
+ ServiceState.UPGRADING_AUTO_FINALIZE) ||
+ updateServiceData.getState() == ServiceState.EXPRESS_UPGRADING) {
return upgradeService(updateServiceData, ugi);
}
@@ -690,7 +691,11 @@ public class ApiServer {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
- sc.initiateUpgrade(service);
+ if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+ sc.actionUpgradeExpress(service);
+ } else {
+ sc.initiateUpgrade(service);
+ }
sc.close();
return null;
});
@@ -706,7 +711,8 @@ public class ApiServer {
String serviceName, Set<String> compNames) throws YarnException,
IOException, InterruptedException {
Service service = getServiceFromClient(ugi, serviceName);
- if (service.getState() != ServiceState.UPGRADING) {
+ if (!service.getState().equals(ServiceState.UPGRADING) &&
+ !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
throw new YarnException(
String.format("The upgrade of service %s has not been initiated.",
service.getName()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 5bf1833..2ef8f7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -166,7 +166,7 @@ public class ClientAMService extends AbstractService
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
context.getServiceManager().processUpgradeRequest(request.getVersion(),
- request.getAutoFinalize());
+ request.getAutoFinalize(), request.getExpressUpgrade());
return UpgradeServiceResponseProto.newBuilder().build();
} catch (Exception ex) {
return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 0196be2..3a55472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.service.api.records.Component;
+
+import java.util.Queue;
/**
* Events are handled by {@link ServiceManager} to manage the service
@@ -29,6 +32,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private final ServiceEventType type;
private String version;
private boolean autoFinalize;
+ private boolean expressUpgrade;
+ private Queue<Component> compsToUpgradeInOrder;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
@@ -56,4 +61,24 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
this.autoFinalize = autoFinalize;
return this;
}
+
+ public boolean isExpressUpgrade() {
+ return expressUpgrade;
+ }
+
+ public ServiceEvent setExpressUpgrade(boolean expressUpgrade) {
+ this.expressUpgrade = expressUpgrade;
+ return this;
+ }
+
+ public Queue<Component> getCompsToUpgradeInOrder() {
+ return compsToUpgradeInOrder;
+ }
+
+ public ServiceEvent setCompsToUpgradeInOrder(
+ Queue<Component> compsToUpgradeInOrder) {
+ this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index 05ecb3f..04454b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
@@ -40,8 +41,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -67,6 +71,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
private final SliderFileSystem fs;
private String upgradeVersion;
+ private Queue<org.apache.hadoop.yarn.service.api.records
+ .Component> compsToUpgradeInOrder;
private static final StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -141,14 +147,20 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
+ serviceManager.upgradeVersion = event.getVersion();
try {
- if (!event.isAutoFinalize()) {
- serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+ if (event.isExpressUpgrade()) {
+ serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
+ serviceManager.compsToUpgradeInOrder = event
+ .getCompsToUpgradeInOrder();
+ serviceManager.upgradeNextCompIfAny();
+ } else if (event.isAutoFinalize()) {
+ serviceManager.serviceSpec.setState(ServiceState
+ .UPGRADING_AUTO_FINALIZE);
} else {
serviceManager.serviceSpec.setState(
- ServiceState.UPGRADING_AUTO_FINALIZE);
+ ServiceState.UPGRADING);
}
- serviceManager.upgradeVersion = event.getVersion();
return State.UPGRADING;
} catch (Throwable e) {
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -169,8 +181,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
if (currState.equals(ServiceState.STABLE)) {
return State.STABLE;
}
+ if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
+ org.apache.hadoop.yarn.service.api.records.Component component =
+ serviceManager.compsToUpgradeInOrder.peek();
+ if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
+ !component.getState().equals(ComponentState.UPGRADING)) {
+ serviceManager.compsToUpgradeInOrder.remove();
+ }
+ serviceManager.upgradeNextCompIfAny();
+ }
if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
- event.getType().equals(ServiceEventType.START)) {
+ event.getType().equals(ServiceEventType.START) ||
+ (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
+ serviceManager.compsToUpgradeInOrder.isEmpty())) {
ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
if (targetState.equals(ServiceState.STABLE)) {
if (serviceManager.finalizeUpgrade()) {
@@ -184,6 +207,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
}
}
+ private void upgradeNextCompIfAny() {
+ if (!compsToUpgradeInOrder.isEmpty()) {
+ org.apache.hadoop.yarn.service.api.records.Component component =
+ compsToUpgradeInOrder.peek();
+
+ ComponentEvent needUpgradeEvent = new ComponentEvent(
+ component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
+ component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ needUpgradeEvent);
+ }
+ }
+
/**
* @return whether finalization of upgrade was successful.
*/
@@ -250,23 +286,18 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
}
void processUpgradeRequest(String upgradeVersion,
- boolean autoFinalize) throws IOException {
+ boolean autoFinalize, boolean expressUpgrade) throws IOException {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
context.fs, context.service.getName(), upgradeVersion);
List<org.apache.hadoop.yarn.service.api.records.Component>
- compsThatNeedUpgrade = componentsFinder.
+ compsNeedUpgradeList = componentsFinder.
findTargetComponentSpecs(context.service, targetSpec);
- ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
- .setVersion(upgradeVersion)
- .setAutoFinalize(autoFinalize);
- context.scheduler.getDispatcher().getEventHandler().handle(event);
- if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
- if (autoFinalize) {
- event.setAutoFinalize(true);
- }
- compsThatNeedUpgrade.forEach(component -> {
+ // remove all components from need upgrade list if there restart policy
+ // doesn't all upgrade.
+ if (compsNeedUpgradeList != null) {
+ compsNeedUpgradeList.removeIf(component -> {
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy = component.getRestartPolicy();
@@ -274,25 +305,65 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
Component.getRestartPolicyHandler(restartPolicy);
// Do not allow upgrades for components which have NEVER/ON_FAILURE
// restart policy
- if (restartPolicyHandler.allowUpgrades()) {
+ if (!restartPolicyHandler.allowUpgrades()) {
+ LOG.info("The component {} has a restart policy that doesnt " +
+ "allow upgrades {} ", component.getName(),
+ component.getRestartPolicy().toString());
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
+ .setVersion(upgradeVersion)
+ .setAutoFinalize(autoFinalize)
+ .setExpressUpgrade(expressUpgrade);
+
+ if (expressUpgrade) {
+ // In case of express upgrade components need to be upgraded in order.
+ // Once the service manager gets notified that a component finished
+ // upgrading, it then issues event to upgrade the next component.
+ Map<String, org.apache.hadoop.yarn.service.api.records.Component>
+ compsNeedUpgradeByName = new HashMap<>();
+ if (compsNeedUpgradeList != null) {
+ compsNeedUpgradeList.forEach(component ->
+ compsNeedUpgradeByName.put(component.getName(), component));
+ }
+ List<String> resolvedComps = ServiceApiUtil
+ .resolveCompsDependency(targetSpec);
+
+ Queue<org.apache.hadoop.yarn.service.api.records.Component>
+ orderedCompUpgrade = new LinkedList<>();
+ resolvedComps.forEach(compName -> {
+ org.apache.hadoop.yarn.service.api.records.Component component =
+ compsNeedUpgradeByName.get(compName);
+ if (component != null ) {
+ orderedCompUpgrade.add(component);
+ }
+ });
+ event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+ }
+
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
+
+ if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
+ if (!expressUpgrade) {
+ compsNeedUpgradeList.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
component).setUpgradeVersion(event.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
- } else {
- LOG.info("The component {} has a restart "
- + "policy that doesnt allow upgrades {} ", component.getName(),
- component.getRestartPolicy().toString());
- }
- });
- } else {
+
+ });
+ }
+ } else if (autoFinalize) {
// nothing to upgrade if upgrade auto finalize is requested, trigger a
// state check.
- if (autoFinalize) {
- context.scheduler.getDispatcher().getEventHandler().handle(
- new ServiceEvent(ServiceEventType.CHECK_STABLE));
- }
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.CHECK_STABLE));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 0801ad0..384659f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -219,7 +219,7 @@ public class ServiceScheduler extends CompositeService {
nmClient.getClient().cleanupRunningContainersOnStop(false);
addIfService(nmClient);
- dispatcher = new AsyncDispatcher("Component dispatcher");
+ dispatcher = createAsyncDispatcher();
dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
dispatcher.register(ComponentEventType.class,
new ComponentEventHandler());
@@ -253,6 +253,9 @@ public class ServiceScheduler extends CompositeService {
YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
app.getConfiguration(), getConfig());
+
+ serviceManager = createServiceManager();
+ context.setServiceManager(serviceManager);
}
protected YarnRegistryViewForProviders createYarnRegistryOperations(
@@ -262,6 +265,14 @@ public class ServiceScheduler extends CompositeService {
context.attemptId);
}
+ protected ServiceManager createServiceManager() {
+ return new ServiceManager(context);
+ }
+
+ protected AsyncDispatcher createAsyncDispatcher() {
+ return new AsyncDispatcher("Component dispatcher");
+ }
+
protected NMClientAsync createNMClient() {
return NMClientAsync.createNMClientAsync(new NMClientCallback());
}
@@ -344,8 +355,6 @@ public class ServiceScheduler extends CompositeService {
// Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED);
- serviceManager = new ServiceManager(context);
- context.setServiceManager(serviceManager);
// recover components based on containers sent from RM
recoverComponents(response);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index b6ae38b..0b3c037 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
- UPGRADING_AUTO_FINALIZE;
+ UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index 5668d9f..a27ed87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -215,48 +216,31 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return EXIT_SUCCESS;
}
- @Override
- public int initiateUpgrade(String appName, String fileName,
- boolean autoFinalize)
- throws IOException, YarnException {
- Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
- null, null);
- if (autoFinalize) {
- upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
- } else {
- upgradeService.setState(ServiceState.UPGRADING);
- }
- return initiateUpgrade(upgradeService);
- }
-
- public int initiateUpgrade(Service service) throws YarnException,
- IOException {
+ private ApplicationReport upgradePrecheck(Service service)
+ throws YarnException, IOException {
boolean upgradeEnabled = getConfig().getBoolean(
- YARN_SERVICE_UPGRADE_ENABLED,
- YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
+ YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
if (!upgradeEnabled) {
throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED);
}
- Service persistedService =
- ServiceApiUtil.loadService(fs, service.getName());
+ Service persistedService = ServiceApiUtil.loadService(fs,
+ service.getName());
if (!StringUtils.isEmpty(persistedService.getId())) {
- cachedAppInfo.put(persistedService.getName(), new AppInfo(
- ApplicationId.fromString(persistedService.getId()),
- persistedService.getKerberosPrincipal().getPrincipalName()));
+ cachedAppInfo.put(persistedService.getName(),
+ new AppInfo(ApplicationId.fromString(persistedService.getId()),
+ persistedService.getKerberosPrincipal().getPrincipalName()));
}
if (persistedService.getVersion().equals(service.getVersion())) {
- String message =
- service.getName() + " is already at version " + service.getVersion()
- + ". There is nothing to upgrade.";
+ String message = service.getName() + " is already at version "
+ + service.getVersion() + ". There is nothing to upgrade.";
LOG.error(message);
throw new YarnException(message);
}
Service liveService = getStatus(service.getName());
if (!liveService.getState().equals(ServiceState.STABLE)) {
- String message = service.getName() + " is at " +
- liveService.getState()
+ String message = service.getName() + " is at " + liveService.getState()
+ " state and upgrade can only be initiated when service is STABLE.";
LOG.error(message);
throw new YarnException(message);
@@ -266,11 +250,67 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
- ApplicationReport appReport =
- yarnClient.getApplicationReport(getAppId(service.getName()));
+ ApplicationReport appReport = yarnClient
+ .getApplicationReport(getAppId(service.getName()));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(service.getName() + " AM hostname is empty");
}
+ return appReport;
+ }
+
+ @Override
+ public int actionUpgradeExpress(String appName, File path)
+ throws IOException, YarnException {
+ Service service =
+ loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+ service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+ actionUpgradeExpress(service);
+ return EXIT_SUCCESS;
+ }
+
+ public int actionUpgradeExpress(Service service) throws YarnException,
+ IOException {
+ ApplicationReport appReport = upgradePrecheck(service);
+ ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+ UpgradeServiceRequestProto.Builder requestBuilder =
+ UpgradeServiceRequestProto.newBuilder();
+ requestBuilder.setVersion(service.getVersion());
+ if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+ requestBuilder.setAutoFinalize(true);
+ }
+ if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+ requestBuilder.setExpressUpgrade(true);
+ requestBuilder.setAutoFinalize(true);
+ }
+ UpgradeServiceResponseProto responseProto = proxy.upgrade(
+ requestBuilder.build());
+ if (responseProto.hasError()) {
+ LOG.error("Service {} express upgrade to version {} failed because {}",
+ service.getName(), service.getVersion(), responseProto.getError());
+ throw new YarnException("Failed to express upgrade service " +
+ service.getName() + " to version " + service.getVersion() +
+ " because " + responseProto.getError());
+ }
+ return EXIT_SUCCESS;
+ }
+
+ @Override
+ public int initiateUpgrade(String appName, String fileName,
+ boolean autoFinalize)
+ throws IOException, YarnException {
+ Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
+ null, null);
+ if (autoFinalize) {
+ upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+ } else {
+ upgradeService.setState(ServiceState.UPGRADING);
+ }
+ return initiateUpgrade(upgradeService);
+ }
+
+ public int initiateUpgrade(Service service) throws YarnException,
+ IOException {
+ ApplicationReport appReport = upgradePrecheck(service);
ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
UpgradeServiceRequestProto.Builder requestBuilder =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 41a2fcd..acf3404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import static org.apache.hadoop.yarn.service.api.records.Component
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
@@ -546,13 +548,21 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public void transition(Component component, ComponentEvent event) {
component.upgradeInProgress.set(true);
+ component.upgradeEvent = event;
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
component.numContainersThatNeedUpgrade.set(
component.componentSpec.getNumberOfContainers());
- component.componentSpec.getContainers().forEach(container ->
- container.setState(ContainerState.NEEDS_UPGRADE));
- component.upgradeEvent = event;
+ component.componentSpec.getContainers().forEach(container -> {
+ container.setState(ContainerState.NEEDS_UPGRADE);
+ if (event.isExpressUpgrade()) {
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ ContainerId.fromString(container.getId()),
+ ComponentInstanceEventType.UPGRADE);
+ LOG.info("Upgrade container {}", container.getId());
+ component.dispatcher.getEventHandler().handle(upgradeEvent);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 84caa77..643961d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -35,6 +35,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
private String upgradeVersion;
+ private boolean expressUpgrade;
public ContainerId getContainerId() {
return containerId;
@@ -113,4 +114,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
this.upgradeVersion = upgradeVersion;
return this;
}
+
+ public boolean isExpressUpgrade() {
+ return expressUpgrade;
+ }
+
+ public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
+ this.expressUpgrade = expressUpgrade;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 11a6caa..ed5e68e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -380,6 +380,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
+ if (!compInstance.containerSpec.getState().equals(
+ ContainerState.NEEDS_UPGRADE)) {
+ //nothing to upgrade. this may happen with express upgrade.
+ return;
+ }
compInstance.containerSpec.setState(ContainerState.UPGRADING);
compInstance.component.decContainersReady(false);
ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index 9219569..b588e88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -638,6 +638,32 @@ public class ServiceApiUtil {
return containerNeedUpgrade;
}
+ /**
+ * Validates the components that are requested are stable for upgrade.
+ * It returns the instances of the components which are in ready state.
+ */
+ public static List<Container> validateAndResolveCompsStable(
+ Service liveService, Collection<String> compNames) throws YarnException {
+ Preconditions.checkNotNull(compNames);
+ HashSet<String> requestedComps = Sets.newHashSet(compNames);
+ List<Container> containerNeedUpgrade = new ArrayList<>();
+ for (Component liveComp : liveService.getComponents()) {
+ if (requestedComps.contains(liveComp.getName())) {
+ if (!liveComp.getState().equals(ComponentState.STABLE)) {
+ // Nothing to upgrade
+ throw new YarnException(String.format(
+ ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
+ }
+ liveComp.getContainers().forEach(liveContainer -> {
+ if (liveContainer.getState().equals(ContainerState.READY)) {
+ containerNeedUpgrade.add(liveContainer);
+ }
+ });
+ }
+ }
+ return containerNeedUpgrade;
+ }
+
private static String parseComponentName(String componentInstanceName)
throws YarnException {
int idx = componentInstanceName.lastIndexOf('-');
@@ -651,4 +677,22 @@ public class ServiceApiUtil {
public static String $(String s) {
return "${" + s +"}";
}
+
+ public static List<String> resolveCompsDependency(Service service) {
+ List<String> components = new ArrayList<String>();
+ for (Component component : service.getComponents()) {
+ int depSize = component.getDependencies().size();
+ if (!components.contains(component.getName())) {
+ components.add(component.getName());
+ }
+ if (depSize != 0) {
+ for (String depComp : component.getDependencies()) {
+ if (!components.contains(depComp)) {
+ components.add(0, depComp);
+ }
+ }
+ }
+ }
+ return components;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 6166ded..169f765 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -66,6 +66,7 @@ message StopResponseProto {
message UpgradeServiceRequestProto {
optional string version = 1;
optional bool autoFinalize = 2;
+ optional bool expressUpgrade = 3;
}
message UpgradeServiceResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
deleted file mode 100644
index c2a80e7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
+++ /dev/null
@@ -1,653 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.service;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.service.api.records.Artifact;
-import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
-import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
-import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
-import org.apache.hadoop.yarn.service.api.records.PlacementScope;
-import org.apache.hadoop.yarn.service.api.records.PlacementType;
-import org.apache.hadoop.yarn.service.api.records.Resource;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.hadoop.yarn.service.conf.RestApiConstants.DEFAULT_UNLIMITED_LIFETIME;
-import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test for ServiceApiUtil helper methods.
- */
-public class TestServiceApiUtil {
- private static final Logger LOG = LoggerFactory
- .getLogger(TestServiceApiUtil.class);
- private static final String EXCEPTION_PREFIX = "Should have thrown " +
- "exception: ";
- private static final String NO_EXCEPTION_PREFIX = "Should not have thrown " +
- "exception: ";
-
- private static final String LEN_64_STR =
- "abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz01";
-
- private static final YarnConfiguration CONF_DEFAULT_DNS = new
- YarnConfiguration();
- private static final YarnConfiguration CONF_DNS_ENABLED = new
- YarnConfiguration();
-
- @BeforeClass
- public static void init() {
- CONF_DNS_ENABLED.setBoolean(RegistryConstants.KEY_DNS_ENABLED, true);
- }
-
- @Test(timeout = 90000)
- public void testResourceValidation() throws Exception {
- assertEquals(RegistryConstants.MAX_FQDN_LABEL_LENGTH + 1, LEN_64_STR
- .length());
-
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
- Service app = new Service();
-
- // no name
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no name");
- } catch (IllegalArgumentException e) {
- assertEquals(ERROR_APPLICATION_NAME_INVALID, e.getMessage());
- }
-
- app.setName("test");
- // no version
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + " service with no version");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(ERROR_APPLICATION_VERSION_INVALID,
- app.getName()), e.getMessage());
- }
-
- app.setVersion("v1");
- // bad format name
- String[] badNames = {"4finance", "Finance", "finance@home", LEN_64_STR};
- for (String badName : badNames) {
- app.setName(badName);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with bad name " + badName);
- } catch (IllegalArgumentException e) {
-
- }
- }
-
- // launch command not specified
- app.setName(LEN_64_STR);
- Component comp = new Component().name("comp1");
- app.addComponent(comp);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS);
- Assert.fail(EXCEPTION_PREFIX + "service with no launch command");
- } catch (IllegalArgumentException e) {
- assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND,
- e.getMessage());
- }
-
- // launch command not specified
- app.setName(LEN_64_STR.substring(0, RegistryConstants
- .MAX_FQDN_LABEL_LENGTH));
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no launch command");
- } catch (IllegalArgumentException e) {
- assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND,
- e.getMessage());
- }
-
- // memory not specified
- comp.setLaunchCommand("sleep 1");
- Resource res = new Resource();
- app.setResource(res);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no memory");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
- comp.getName()), e.getMessage());
- }
-
- // invalid no of cpus
- res.setMemory("100mb");
- res.setCpus(-2);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(
- EXCEPTION_PREFIX + "service with invalid no of cpus");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
- comp.getName()), e.getMessage());
- }
-
- // number of containers not specified
- res.setCpus(2);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no container count");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage()
- .contains(ERROR_CONTAINERS_COUNT_INVALID));
- }
-
- // specifying profile along with cpus/memory raises exception
- res.setProfile("hbase_finance_large");
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX
- + "service with resource profile along with cpus/memory");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(RestApiErrorMessages
- .ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
- comp.getName()),
- e.getMessage());
- }
-
- // currently resource profile alone is not supported.
- // TODO: remove the next test once resource profile alone is supported.
- res.setCpus(null);
- res.setMemory(null);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with resource profile only");
- } catch (IllegalArgumentException e) {
- assertEquals(ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET,
- e.getMessage());
- }
-
- // unset profile here and add cpus/memory back
- res.setProfile(null);
- res.setCpus(2);
- res.setMemory("2gb");
-
- // null number of containers
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "null number of containers");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith(ERROR_CONTAINERS_COUNT_INVALID));
- }
- }
-
- @Test
- public void testArtifacts() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
- Service app = new Service();
- app.setName("service1");
- app.setVersion("v1");
- Resource res = new Resource();
- app.setResource(res);
- res.setMemory("512M");
-
- // no artifact id fails with default type
- Artifact artifact = new Artifact();
- app.setArtifact(artifact);
- String compName = "comp1";
- Component comp = ServiceTestUtils.createComponent(compName);
-
- app.setComponents(Collections.singletonList(comp));
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName),
- e.getMessage());
- }
-
- // no artifact id fails with SERVICE type
- artifact.setType(Artifact.TypeEnum.SERVICE);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
- } catch (IllegalArgumentException e) {
- assertEquals(ERROR_ARTIFACT_ID_INVALID, e.getMessage());
- }
-
- // no artifact id fails with TARBALL type
- artifact.setType(Artifact.TypeEnum.TARBALL);
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName),
- e.getMessage());
- }
-
- // everything valid here
- artifact.setType(Artifact.TypeEnum.DOCKER);
- artifact.setId("docker.io/centos:centos7");
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- LOG.error("service attributes specified should be valid here", e);
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
-
- assertEquals(app.getLifetime(), DEFAULT_UNLIMITED_LIFETIME);
- }
-
- private static Resource createValidResource() {
- Resource res = new Resource();
- res.setMemory("512M");
- return res;
- }
-
- private static Component createValidComponent(String compName) {
- Component comp = new Component();
- comp.setName(compName);
- comp.setResource(createValidResource());
- comp.setNumberOfContainers(1L);
- comp.setLaunchCommand("sleep 1");
- return comp;
- }
-
- private static Service createValidApplication(String compName) {
- Service app = new Service();
- app.setName("name");
- app.setVersion("v1");
- app.setResource(createValidResource());
- if (compName != null) {
- app.addComponent(createValidComponent(compName));
- }
- return app;
- }
-
- @Test
- public void testExternalApplication() throws IOException {
- Service ext = createValidApplication("comp1");
- SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
- Service app = createValidApplication(null);
-
- Artifact artifact = new Artifact();
- artifact.setType(Artifact.TypeEnum.SERVICE);
- artifact.setId("id");
- app.setArtifact(artifact);
- app.addComponent(ServiceTestUtils.createComponent("comp2"));
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
-
- assertEquals(1, app.getComponents().size());
- assertNotNull(app.getComponent("comp2"));
- }
-
- @Test
- public void testDuplicateComponents() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
- String compName = "comp1";
- Service app = createValidApplication(compName);
- app.addComponent(createValidComponent(compName));
-
- // duplicate component name fails
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with component collision");
- } catch (IllegalArgumentException e) {
- assertEquals("Component name collision: " + compName, e.getMessage());
- }
- }
-
- @Test
- public void testComponentNameSameAsServiceName() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
- Service app = new Service();
- app.setName("test");
- app.setVersion("v1");
- app.addComponent(createValidComponent("test"));
-
- //component name same as service name
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "component name matches service name");
- } catch (IllegalArgumentException e) {
- assertEquals("Component name test must not be same as service name test",
- e.getMessage());
- }
- }
-
- @Test
- public void testExternalDuplicateComponent() throws IOException {
- Service ext = createValidApplication("comp1");
- SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
- Service app = createValidApplication("comp1");
- Artifact artifact = new Artifact();
- artifact.setType(Artifact.TypeEnum.SERVICE);
- artifact.setId("id");
- app.getComponent("comp1").setArtifact(artifact);
-
- // duplicate component name okay in the case of SERVICE component
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
- }
-
- @Test
- public void testExternalComponent() throws IOException {
- Service ext = createValidApplication("comp1");
- SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
- Service app = createValidApplication("comp2");
- Artifact artifact = new Artifact();
- artifact.setType(Artifact.TypeEnum.SERVICE);
- artifact.setId("id");
- app.setArtifact(artifact);
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
-
- assertEquals(1, app.getComponents().size());
- // artifact ID not inherited from global
- assertNotNull(app.getComponent("comp2"));
-
- // set SERVICE artifact id on component
- app.getComponent("comp2").setArtifact(artifact);
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
-
- assertEquals(1, app.getComponents().size());
- // original component replaced by external component
- assertNotNull(app.getComponent("comp1"));
- }
-
- public static void verifyDependencySorting(List<Component> components,
- Component... expectedSorting) {
- Collection<Component> actualSorting = ServiceApiUtil.sortByDependencies(
- components);
- assertEquals(expectedSorting.length, actualSorting.size());
- int i = 0;
- for (Component component : actualSorting) {
- assertEquals(expectedSorting[i++], component);
- }
- }
-
- @Test
- public void testDependencySorting() throws IOException {
- Component a = ServiceTestUtils.createComponent("a");
- Component b = ServiceTestUtils.createComponent("b");
- Component c = ServiceTestUtils.createComponent("c");
- Component d =
- ServiceTestUtils.createComponent("d").dependencies(Arrays.asList("c"));
- Component e = ServiceTestUtils.createComponent("e")
- .dependencies(Arrays.asList("b", "d"));
-
- verifyDependencySorting(Arrays.asList(a, b, c), a, b, c);
- verifyDependencySorting(Arrays.asList(c, a, b), c, a, b);
- verifyDependencySorting(Arrays.asList(a, b, c, d, e), a, b, c, d, e);
- verifyDependencySorting(Arrays.asList(e, d, c, b, a), c, b, a, d, e);
-
- c.setDependencies(Arrays.asList("e"));
- try {
- verifyDependencySorting(Arrays.asList(a, b, c, d, e));
- Assert.fail(EXCEPTION_PREFIX + "components with dependency cycle");
- } catch (IllegalArgumentException ex) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_DEPENDENCY_CYCLE, Arrays.asList(c, d,
- e)), ex.getMessage());
- }
-
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
- Service service = createValidApplication(null);
- service.setComponents(Arrays.asList(c, d, e));
- try {
- ServiceApiUtil.validateAndResolveService(service, sfs,
- CONF_DEFAULT_DNS);
- Assert.fail(EXCEPTION_PREFIX + "components with bad dependencies");
- } catch (IllegalArgumentException ex) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, "b", "e"), ex
- .getMessage());
- }
- }
-
- @Test
- public void testInvalidComponent() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
- testComponent(sfs);
- }
-
- @Test
- public void testValidateCompName() {
- String[] invalidNames = {
- "EXAMPLE", // UPPER case not allowed
- "example_app" // underscore not allowed.
- };
- for (String name : invalidNames) {
- try {
- ServiceApiUtil.validateNameFormat(name, new Configuration());
- Assert.fail();
- } catch (IllegalArgumentException ex) {
- ex.printStackTrace();
- }
- }
- }
-
- private static void testComponent(SliderFileSystem sfs)
- throws IOException {
- int maxLen = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
- assertEquals(19, Long.toString(Long.MAX_VALUE).length());
- maxLen = maxLen - Long.toString(Long.MAX_VALUE).length();
-
- String compName = LEN_64_STR.substring(0, maxLen + 1);
- Service app = createValidApplication(null);
- app.addComponent(createValidComponent(compName));
-
- // invalid component name fails if dns is enabled
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "service with invalid component name");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(RestApiErrorMessages
- .ERROR_COMPONENT_NAME_INVALID, maxLen, compName), e.getMessage());
- }
-
- // does not fail if dns is disabled
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
-
- compName = LEN_64_STR.substring(0, maxLen);
- app = createValidApplication(null);
- app.addComponent(createValidComponent(compName));
-
- // does not fail
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
- }
-
- @Test
- public void testPlacementPolicy() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
- Service app = createValidApplication("comp-a");
- Component comp = app.getComponents().get(0);
- PlacementPolicy pp = new PlacementPolicy();
- PlacementConstraint pc = new PlacementConstraint();
- pc.setName("CA1");
- pp.setConstraints(Collections.singletonList(pc));
- comp.setPlacementPolicy(pp);
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "constraint with no type");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
- "CA1 ", "comp-a"), e.getMessage());
- }
-
- // Set the type
- pc.setType(PlacementType.ANTI_AFFINITY);
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "constraint with no scope");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
- "CA1 ", "comp-a"), e.getMessage());
- }
-
- // Set the scope
- pc.setScope(PlacementScope.NODE);
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)");
- } catch (IllegalArgumentException e) {
- assertEquals(String.format(
- RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
- "CA1 ", "comp-a"), e.getMessage());
- }
-
- // Set a target tag - but an invalid one
- pc.setTargetTags(Collections.singletonList("comp-invalid"));
-
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name");
- } catch (IllegalArgumentException e) {
- assertEquals(
- String.format(
- RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
- "comp-invalid", "comp-a", "comp-a", "comp-a"),
- e.getMessage());
- }
-
- // Set valid target tags now
- pc.setTargetTags(Collections.singletonList("comp-a"));
-
- // Finally it should succeed
- try {
- ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
- }
-
- @Test
- public void testKerberosPrincipal() throws IOException {
- SliderFileSystem sfs = ServiceTestUtils.initMockFs();
- Service app = createValidApplication("comp-a");
- KerberosPrincipal kp = new KerberosPrincipal();
- kp.setKeytab("/some/path");
- kp.setPrincipalName("user/_HOST@domain.com");
- app.setKerberosPrincipal(kp);
-
- try {
- ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
- Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab URI scheme");
- } catch (IllegalArgumentException e) {
- assertEquals(
- String.format(RestApiErrorMessages.ERROR_KEYTAB_URI_SCHEME_INVALID,
- kp.getKeytab()),
- e.getMessage());
- }
-
- kp.setKeytab("/ blank / in / paths");
- try {
- ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
- Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab");
- } catch (IllegalArgumentException e) {
- // strip out the %s at the end of the RestApiErrorMessages string constant
- assertTrue(e.getMessage().contains(
- RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.substring(0,
- RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.length() - 2)));
- }
-
- kp.setKeytab("file:///tmp/a.keytab");
- // now it should succeed
- try {
- ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
- }
-
- @Test
- public void testKerberosPrincipalNameFormat() throws IOException {
- Service app = createValidApplication("comp-a");
- KerberosPrincipal kp = new KerberosPrincipal();
- kp.setPrincipalName("user@domain.com");
- app.setKerberosPrincipal(kp);
-
- try {
- ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
- Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name format.");
- } catch (IllegalArgumentException e) {
- assertEquals(
- String.format(RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
- kp.getPrincipalName()),
- e.getMessage());
- }
-
- kp.setPrincipalName("user/_HOST@domain.com");
- try {
- ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
- } catch (IllegalArgumentException e) {
- Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index fc509f1..a37cabe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -19,23 +19,26 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
-import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
/**
* Tests for {@link ServiceManager}.
@@ -46,117 +49,120 @@ public class TestServiceManager {
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
- @Test
- public void testUpgrade() throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager("testUpgrade");
- upgrade(serviceManager, "v2", false, false);
+ @Test (timeout = TIMEOUT)
+ public void testUpgrade() throws Exception {
+ ServiceContext context = createServiceContext("testUpgrade");
+ initUpgrade(context, "v2", false, false, false);
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
- serviceManager.getServiceSpec().getState());
+ context.getServiceManager().getServiceSpec().getState());
}
- @Test
+ @Test (timeout = TIMEOUT)
public void testRestartNothingToUpgrade()
- throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager(
+ throws Exception {
+ ServiceContext context = createServiceContext(
"testRestartNothingToUpgrade");
- upgrade(serviceManager, "v2", false, false);
-
- //make components stable
- serviceManager.getServiceSpec().getComponents().forEach(comp -> {
- comp.setState(ComponentState.STABLE);
- });
- serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+ initUpgrade(context, "v2", false, false, false);
+ ServiceManager manager = context.getServiceManager();
+ //make components stable by upgrading all instances
+ upgradeAllInstances(context);
+
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.START));
+ GenericTestUtils.waitFor(()->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
Assert.assertEquals("service not re-started", ServiceState.STABLE,
- serviceManager.getServiceSpec().getState());
+ manager.getServiceSpec().getState());
}
- @Test
- public void testAutoFinalizeNothingToUpgrade() throws IOException,
- SliderException {
- ServiceManager serviceManager = createTestServiceManager(
+ @Test(timeout = TIMEOUT)
+ public void testAutoFinalizeNothingToUpgrade() throws Exception {
+ ServiceContext context = createServiceContext(
"testAutoFinalizeNothingToUpgrade");
- upgrade(serviceManager, "v2", false, true);
-
- //make components stable
- serviceManager.getServiceSpec().getComponents().forEach(comp ->
- comp.setState(ComponentState.STABLE));
- serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
+ initUpgrade(context, "v2", false, true, false);
+ ServiceManager manager = context.getServiceManager();
+ //make components stable by upgrading all instances
+ upgradeAllInstances(context);
+
+ GenericTestUtils.waitFor(()->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
Assert.assertEquals("service stable", ServiceState.STABLE,
- serviceManager.getServiceSpec().getState());
+ manager.getServiceSpec().getState());
}
- @Test
+ @Test(timeout = TIMEOUT)
public void testRestartWithPendingUpgrade()
- throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager("testRestart");
- upgrade(serviceManager, "v2", true, false);
- serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+ throws Exception {
+ ServiceContext context = createServiceContext("testRestart");
+ initUpgrade(context, "v2", true, false, false);
+ ServiceManager manager = context.getServiceManager();
+
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.START));
+ context.scheduler.getDispatcher().stop();
Assert.assertEquals("service should still be upgrading",
- ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+ ServiceState.UPGRADING, manager.getServiceSpec().getState());
}
- @Test
- public void testCheckState() throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager(
- "testCheckState");
- upgrade(serviceManager, "v2", true, false);
+ @Test(timeout = TIMEOUT)
+ public void testFinalize() throws Exception {
+ ServiceContext context = createServiceContext("testCheckState");
+ initUpgrade(context, "v2", true, false, false);
+ ServiceManager manager = context.getServiceManager();
Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
- serviceManager.getServiceSpec().getState());
+ manager.getServiceSpec().getState());
- // make components stable
- serviceManager.getServiceSpec().getComponents().forEach(comp -> {
- comp.setState(ComponentState.STABLE);
- });
- ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
- serviceManager.handle(checkStable);
- Assert.assertEquals("service should still be upgrading",
- ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+ //make components stable by upgrading all instances
+ upgradeAllInstances(context);
// finalize service
- ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
- serviceManager.handle(restart);
- Assert.assertEquals("service not stable",
- ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+ context.scheduler.getDispatcher().getEventHandler().handle(
+ new ServiceEvent(ServiceEventType.START));
+ GenericTestUtils.waitFor(()->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
+ Assert.assertEquals("service not re-started", ServiceState.STABLE,
+ manager.getServiceSpec().getState());
- validateUpgradeFinalization(serviceManager.getName(), "v2");
+ validateUpgradeFinalization(manager.getName(), "v2");
}
- @Test
- public void testCheckStateAutoFinalize() throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager(
- "testCheckState");
- serviceManager.getServiceSpec().setState(
+ @Test(timeout = TIMEOUT)
+ public void testAutoFinalize() throws Exception {
+ ServiceContext context = createServiceContext("testCheckStateAutoFinalize");
+ ServiceManager manager = context.getServiceManager();
+ manager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
- upgrade(serviceManager, "v2", true, true);
- Assert.assertEquals("service not upgrading",
- ServiceState.UPGRADING_AUTO_FINALIZE,
- serviceManager.getServiceSpec().getState());
+ initUpgrade(context, "v2", true, true, false);
// make components stable
- serviceManager.getServiceSpec().getComponents().forEach(comp ->
- comp.setState(ComponentState.STABLE));
- ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
- serviceManager.handle(checkStable);
+ upgradeAllInstances(context);
+
+ GenericTestUtils.waitFor(() ->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
Assert.assertEquals("service not stable",
- ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+ ServiceState.STABLE, manager.getServiceSpec().getState());
- validateUpgradeFinalization(serviceManager.getName(), "v2");
+ validateUpgradeFinalization(manager.getName(), "v2");
}
@Test
- public void testInvalidUpgrade() throws IOException, SliderException {
- ServiceManager serviceManager = createTestServiceManager(
- "testInvalidUpgrade");
- serviceManager.getServiceSpec().setState(
+ public void testInvalidUpgrade() throws Exception {
+ ServiceContext serviceContext = createServiceContext("testInvalidUpgrade");
+ ServiceManager manager = serviceContext.getServiceManager();
+ manager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
Service upgradedDef = ServiceTestUtils.createExampleApplication();
- upgradedDef.setName(serviceManager.getName());
+ upgradedDef.setName(manager.getName());
upgradedDef.setVersion("v2");
upgradedDef.setLifetime(2L);
writeUpgradedDef(upgradedDef);
try {
- serviceManager.processUpgradeRequest("v2", true);
+ manager.processUpgradeRequest("v2", true, false);
} catch (Exception ex) {
Assert.assertTrue(ex instanceof UnsupportedOperationException);
return;
@@ -164,6 +170,32 @@ public class TestServiceManager {
Assert.fail();
}
+ @Test(timeout = TIMEOUT)
+ public void testExpressUpgrade() throws Exception {
+ ServiceContext context = createServiceContext("testExpressUpgrade");
+ ServiceManager manager = context.getServiceManager();
+ manager.getServiceSpec().setState(
+ ServiceState.EXPRESS_UPGRADING);
+ initUpgrade(context, "v2", true, true, true);
+
+ List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
+ // wait till instances of first component are in upgrade
+ String comp1 = comps.get(0);
+ upgradeInstancesOf(context, comp1);
+
+ // wait till instances of second component are in upgrade
+ String comp2 = comps.get(1);
+ upgradeInstancesOf(context, comp2);
+
+ GenericTestUtils.waitFor(() ->
+ context.service.getState().equals(ServiceState.STABLE),
+ CHECK_EVERY_MILLIS, TIMEOUT);
+
+ Assert.assertEquals("service not stable",
+ ServiceState.STABLE, manager.getServiceSpec().getState());
+ validateUpgradeFinalization(manager.getName(), "v2");
+ }
+
private void validateUpgradeFinalization(String serviceName,
String expectedVersion) throws IOException {
Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@@ -172,15 +204,16 @@ public class TestServiceManager {
Assert.assertNotNull("app id not present", savedSpec.getId());
Assert.assertEquals("state not stable", ServiceState.STABLE,
savedSpec.getState());
- savedSpec.getComponents().forEach(compSpec -> {
- Assert.assertEquals("comp not stable", ComponentState.STABLE,
- compSpec.getState());
- });
+ savedSpec.getComponents().forEach(compSpec ->
+ Assert.assertEquals("comp not stable", ComponentState.STABLE,
+ compSpec.getState()));
}
- private void upgrade(ServiceManager serviceManager, String version,
- boolean upgradeArtifact, boolean autoFinalize)
- throws IOException, SliderException {
+ private void initUpgrade(ServiceContext context, String version,
+ boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade)
+ throws IOException, SliderException, TimeoutException,
+ InterruptedException {
+ ServiceManager serviceManager = context.getServiceManager();
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(serviceManager.getName());
upgradedDef.setVersion(version);
@@ -191,39 +224,81 @@ public class TestServiceManager {
});
}
writeUpgradedDef(upgradedDef);
- serviceManager.processUpgradeRequest(version, autoFinalize);
+ serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
- upgradeEvent.setVersion(version);
- if (autoFinalize) {
- upgradeEvent.setAutoFinalize(true);
- }
- serviceManager.handle(upgradeEvent);
+ upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
+ .setAutoFinalize(autoFinalize);
+
+ GenericTestUtils.waitFor(()-> {
+ ServiceState serviceState = context.service.getState();
+ if (serviceState.equals(ServiceState.UPGRADING) ||
+ serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
+ serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
+ return true;
+ }
+ return false;
+ }, CHECK_EVERY_MILLIS, TIMEOUT);
+ }
+
+ private void upgradeAllInstances(ServiceContext context) throws
+ TimeoutException, InterruptedException {
+ // upgrade the instances
+ context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+ ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+ ComponentInstanceEventType.UPGRADE);
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
+ }));
+
+ // become ready
+ context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+ ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+ ComponentInstanceEventType.BECOME_READY);
+
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
+ }));
+ GenericTestUtils.waitFor(()-> {
+ for (ComponentInstance instance:
+ context.scheduler.getLiveInstances().values()) {
+ if (!instance.getContainerState().equals(ContainerState.READY)) {
+ return false;
+ }
+ }
+ return true;
+ }, CHECK_EVERY_MILLIS, TIMEOUT);
}
- private ServiceManager createTestServiceManager(String name)
- throws IOException {
- ServiceContext context = new ServiceContext();
- context.service = createBaseDef(name);
- context.fs = rule.getFs();
-
- context.scheduler = new ServiceScheduler(context) {
- @Override
- protected YarnRegistryViewForProviders createYarnRegistryOperations(
- ServiceContext context, RegistryOperations registryClient) {
- return mock(YarnRegistryViewForProviders.class);
+ private void upgradeInstancesOf(ServiceContext context, String compName)
+ throws TimeoutException, InterruptedException {
+ Collection<ComponentInstance> compInstances = context.scheduler
+ .getAllComponents().get(compName).getAllComponentInstances();
+ GenericTestUtils.waitFor(() -> {
+ for (ComponentInstance instance : compInstances) {
+ if (!instance.getContainerState().equals(ContainerState.UPGRADING)) {
+ return false;
+ }
}
- };
+ return true;
+ }, CHECK_EVERY_MILLIS, TIMEOUT);
- context.scheduler.init(rule.getConf());
+ // instances of comp1 get upgraded and become ready event is triggered
+ // become ready
+ compInstances.forEach(instance -> {
+ ComponentInstanceEvent event = new ComponentInstanceEvent(
+ instance.getContainer().getId(),
+ ComponentInstanceEventType.BECOME_READY);
- Map<String, org.apache.hadoop.yarn.service.component.Component>
- componentState = context.scheduler.getAllComponents();
- context.service.getComponents().forEach(component -> {
- componentState.put(component.getName(),
- new org.apache.hadoop.yarn.service.component.Component(component,
- 1L, context));
+ context.scheduler.getDispatcher().getEventHandler().handle(event);
});
- return new ServiceManager(context);
+ }
+
+ private ServiceContext createServiceContext(String name)
+ throws Exception {
+ Service service = createBaseDef(name);
+ ServiceContext context = new MockRunningServiceContext(rule,
+ service);
+ context.scheduler.getDispatcher().setDrainEventsOnStop();
+ context.scheduler.getDispatcher().start();
+ return context;
}
public static Service createBaseDef(String name) {
@@ -257,4 +332,6 @@ public class TestServiceManager {
upgradedDef);
}
+ private static final int TIMEOUT = 200000;
+ private static final int CHECK_EVERY_MILLIS = 100;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 8b13b24..216d88f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -415,6 +415,41 @@ public class TestYarnNativeServices extends ServiceTestUtils {
client.actionDestroy(service.getName());
}
+ @Test(timeout = 200000)
+ public void testExpressUpgrade() throws Exception {
+ setupInternal(NUM_NMS);
+ getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
+ ServiceClient client = createClient(getConf());
+
+ Service service = createExampleApplication();
+ client.actionCreate(service);
+ waitForServiceToBeStable(client, service);
+
+ // upgrade the service
+ Component component = service.getComponents().iterator().next();
+ service.setState(ServiceState.EXPRESS_UPGRADING);
+ service.setVersion("v2");
+ component.getConfiguration().getEnv().put("key1", "val1");
+ Component component2 = service.getComponent("compb");
+ component2.getConfiguration().getEnv().put("key2", "val2");
+ client.actionUpgradeExpress(service);
+
+ // wait for upgrade to complete
+ waitForServiceToBeStable(client, service);
+ Service active = client.getStatus(service.getName());
+ Assert.assertEquals("component not stable", ComponentState.STABLE,
+ active.getComponent(component.getName()).getState());
+ Assert.assertEquals("compa does not have new env", "val1",
+ active.getComponent(component.getName()).getConfiguration()
+ .getEnv("key1"));
+ Assert.assertEquals("compb does not have new env", "val2",
+ active.getComponent(component2.getName()).getConfiguration()
+ .getEnv("key2"));
+ LOG.info("Stop/destroy service {}", service);
+ client.actionStop(service.getName(), true);
+ client.actionDestroy(service.getName());
+ }
+
// Test to verify ANTI_AFFINITY placement policy
// 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
// 2. Create an example service with 3 containers
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org