You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:10:48 UTC
[22/47] hadoop git commit: YARN-8488. Added SUCCEEDED/FAILED states
to YARN service. Contributed by Suma Shivaprasad
YARN-8488. Added SUCCEEDED/FAILED states to YARN service.
Contributed by Suma Shivaprasad
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd089caf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd089caf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd089caf
Branch: refs/heads/HDFS-12943
Commit: fd089caf69cf608a91564c9c3d20cbf84e7fd60c
Parents: c61824a
Author: Eric Yang <ey...@apache.org>
Authored: Tue Aug 28 13:55:28 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Tue Aug 28 13:55:28 2018 -0400
----------------------------------------------------------------------
.../hadoop/yarn/service/ServiceScheduler.java | 100 ++++++++++---
.../service/api/records/ComponentState.java | 2 +-
.../service/api/records/ContainerState.java | 3 +-
.../yarn/service/api/records/ServiceState.java | 2 +-
.../component/instance/ComponentInstance.java | 144 ++++++++++++++-----
.../timelineservice/ServiceTimelineEvent.java | 5 +-
.../ServiceTimelinePublisher.java | 33 ++++-
.../yarn/service/MockRunningServiceContext.java | 18 ++-
.../hadoop/yarn/service/ServiceTestUtils.java | 9 +-
.../yarn/service/component/TestComponent.java | 55 ++++++-
.../component/TestComponentRestartPolicy.java | 1 -
.../instance/TestComponentInstance.java | 35 ++---
.../TestServiceTimelinePublisher.java | 4 +-
13 files changed, 322 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 384659f..b49ef2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
@@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +105,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
-import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
+ .KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
@@ -137,6 +141,8 @@ public class ServiceScheduler extends CompositeService {
private ServiceTimelinePublisher serviceTimelinePublisher;
+ private boolean timelineServiceEnabled;
+
// Global diagnostics that will be reported to RM on eRxit.
// The unit the number of characters. This will be limited to 64 * 1024
// characters.
@@ -169,6 +175,8 @@ public class ServiceScheduler extends CompositeService {
private volatile FinalApplicationStatus finalApplicationStatus =
FinalApplicationStatus.ENDED;
+ private Clock systemClock;
+
// For unit test override since we don't want to terminate UT process.
private ServiceUtils.ProcessTerminationHandler
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
@@ -176,6 +184,8 @@ public class ServiceScheduler extends CompositeService {
public ServiceScheduler(ServiceContext context) {
super(context.getService().getName());
this.context = context;
+ this.app = context.getService();
+ this.systemClock = SystemClock.getInstance();
}
public void buildInstance(ServiceContext context, Configuration configuration)
@@ -254,8 +264,14 @@ public class ServiceScheduler extends CompositeService {
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
app.getConfiguration(), getConfig());
+ if (YarnConfiguration
+ .timelineServiceV2Enabled(getConfig())) {
+ timelineServiceEnabled = true;
+ }
+
serviceManager = createServiceManager();
context.setServiceManager(serviceManager);
+
}
protected YarnRegistryViewForProviders createYarnRegistryOperations(
@@ -311,21 +327,38 @@ public class ServiceScheduler extends CompositeService {
// only stop the entire service when a graceful stop has been initiated
// (e.g. via client RPC, not through the AM receiving a SIGTERM)
if (gracefulStop) {
+
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- // mark component-instances/containers as STOPPED
- for (ContainerId containerId : getLiveInstances().keySet()) {
- serviceTimelinePublisher.componentInstanceFinished(containerId,
- KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
+
+ // mark other component-instances/containers as STOPPED
+ final Map<ContainerId, ComponentInstance> liveInst =
+ getLiveInstances();
+ for (Map.Entry<ContainerId, ComponentInstance> instance : liveInst
+ .entrySet()) {
+ if (!ComponentInstance.isFinalState(
+ instance.getValue().getContainerSpec().getState())) {
+ LOG.info("{} Component instance state changed from {} to {}",
+ instance.getValue().getCompInstanceName(),
+ instance.getValue().getContainerSpec().getState(),
+ ContainerState.STOPPED);
+ serviceTimelinePublisher.componentInstanceFinished(
+ instance.getKey(), KILLED_AFTER_APP_COMPLETION,
+ ContainerState.STOPPED, getDiagnostics().toString());
+ }
}
+
+ LOG.info("Service state changed to {}", finalApplicationStatus);
// mark attempt as unregistered
- serviceTimelinePublisher
- .serviceAttemptUnregistered(context, diagnostics.toString());
+ serviceTimelinePublisher.serviceAttemptUnregistered(context,
+ finalApplicationStatus, diagnostics.toString());
}
+
// unregister AM
- amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
+ amRMClient.unregisterApplicationMaster(finalApplicationStatus,
diagnostics.toString(), "");
- LOG.info("Service {} unregistered with RM, with attemptId = {} " +
- ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
+ LOG.info("Service {} unregistered with RM, with attemptId = {} "
+ + ", diagnostics = {} ", app.getName(), context.attemptId,
+ diagnostics);
}
super.serviceStop();
}
@@ -911,7 +944,7 @@ public class ServiceScheduler extends CompositeService {
* (which #failed-instances + #suceeded-instances = #total-n-containers)
* The service will be terminated.
*/
- public synchronized void terminateServiceIfAllComponentsFinished() {
+ public void terminateServiceIfAllComponentsFinished() {
boolean shouldTerminate = true;
// Succeeded comps and failed comps, for logging purposes.
@@ -920,7 +953,30 @@ public class ServiceScheduler extends CompositeService {
for (Component comp : getAllComponents().values()) {
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
- if (!restartPolicy.shouldTerminate(comp)) {
+
+ if (restartPolicy.shouldTerminate(comp)) {
+ if (restartPolicy.hasCompletedSuccessfully(comp)) {
+ comp.getComponentSpec().setState(org.apache.hadoop
+ .yarn.service.api.records.ComponentState.SUCCEEDED);
+ LOG.info("{} Component state changed from {} to {}",
+ comp.getName(), comp.getComponentSpec().getState(),
+ org.apache.hadoop
+ .yarn.service.api.records.ComponentState.SUCCEEDED);
+ } else {
+ comp.getComponentSpec().setState(org.apache.hadoop
+ .yarn.service.api.records.ComponentState.FAILED);
+ LOG.info("{} Component state changed from {} to {}",
+ comp.getName(), comp.getComponentSpec().getState(),
+ org.apache.hadoop
+ .yarn.service.api.records.ComponentState.FAILED);
+ }
+
+ if (isTimelineServiceEnabled()) {
+ // record in ATS
+ serviceTimelinePublisher.componentFinished(comp.getComponentSpec(),
+ comp.getComponentSpec().getState(), systemClock.getTime());
+ }
+ } else {
shouldTerminate = false;
break;
}
@@ -929,7 +985,7 @@ public class ServiceScheduler extends CompositeService {
if (nFailed > 0) {
failedComponents.add(comp.getName());
- } else{
+ } else {
succeededComponents.add(comp.getName());
}
}
@@ -944,16 +1000,28 @@ public class ServiceScheduler extends CompositeService {
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
.join(failedComponents, ",") + "]");
+ int exitStatus = EXIT_SUCCESS;
if (failedComponents.isEmpty()) {
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
- getTerminationHandler().terminate(EXIT_SUCCESS);
- } else{
+ app.setState(ServiceState.SUCCEEDED);
+ } else {
setGracefulStop(FinalApplicationStatus.FAILED);
- getTerminationHandler().terminate(EXIT_FALSE);
+ app.setState(ServiceState.FAILED);
+ exitStatus = EXIT_FALSE;
}
+
+ getTerminationHandler().terminate(exitStatus);
}
}
+ public Clock getSystemClock() {
+ return systemClock;
+ }
+
+ public boolean isTimelineServiceEnabled() {
+ return timelineServiceEnabled;
+ }
+
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return terminationHandler;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
index 3e7ed11..472f374 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
@@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
@ApiModel(description = "The current state of a component.")
public enum ComponentState {
- FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING;
+ FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
index 6e39073..cac527a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
@@ -26,5 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum ContainerState {
- RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING;
+ RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
+ FAILED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 0b3c037..3876173 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
- UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING;
+ UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index ed5e68e..afd8c67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -41,7 +42,9 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
@@ -68,6 +71,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
+
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
+ .KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
@@ -242,15 +248,22 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
@VisibleForTesting
- static void handleComponentInstanceRelaunch(
- ComponentInstance compInstance, ComponentInstanceEvent event,
- boolean failureBeforeLaunch) {
+ static void handleComponentInstanceRelaunch(ComponentInstance compInstance,
+ ComponentInstanceEvent event, boolean failureBeforeLaunch,
+ String containerDiag) {
Component comp = compInstance.getComponent();
// Do we need to relaunch the service?
- boolean hasContainerFailed = hasContainerFailed(event.getStatus());
+ boolean hasContainerFailed = failureBeforeLaunch || hasContainerFailed(
+ event.getStatus());
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
+ ContainerState containerState =
+ hasContainerFailed ? ContainerState.FAILED : ContainerState.SUCCEEDED;
+
+ if (compInstance.getContainerSpec() != null) {
+ compInstance.getContainerSpec().setState(containerState);
+ }
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
// re-ask the failed container.
@@ -259,25 +272,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
StringBuilder builder = new StringBuilder();
builder.append(compInstance.getCompInstanceId()).append(": ");
- builder.append(event.getContainerId()).append(" completed. Reinsert back to pending list and requested ");
+ builder.append(event.getContainerId()).append(
+ " completed. Reinsert back to pending list and requested ");
builder.append("a new container.").append(System.lineSeparator());
- builder.append(" exitStatus=").append(failureBeforeLaunch ? null : event.getStatus().getExitStatus());
+ builder.append(" exitStatus=").append(
+ failureBeforeLaunch ? null : event.getStatus().getExitStatus());
builder.append(", diagnostics=");
- builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics());
+ builder.append(failureBeforeLaunch ?
+ FAILED_BEFORE_LAUNCH_DIAG :
+ event.getStatus().getDiagnostics());
- if (event.getStatus().getExitStatus() != 0) {
+ if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
LOG.error(builder.toString());
- } else {
+ } else{
LOG.info(builder.toString());
}
- } else {
+
+ if (compInstance.timelineServiceEnabled) {
+ // record in ATS
+ LOG.info("Publishing component instance status {} {} ",
+ event.getContainerId(), containerState);
+ compInstance.serviceTimelinePublisher.componentInstanceFinished(
+ event.getContainerId(), event.getStatus().getExitStatus(),
+ containerState, containerDiag);
+ }
+
+ } else{
// When no relaunch, update component's #succeeded/#failed
// instances.
if (hasContainerFailed) {
comp.markAsFailed(compInstance);
- } else {
+ } else{
comp.markAsSucceeded(compInstance);
}
+
+ if (compInstance.timelineServiceEnabled) {
+ // record in ATS
+ compInstance.serviceTimelinePublisher.componentInstanceFinished(
+ event.getContainerId(), event.getStatus().getExitStatus(),
+ containerState, containerDiag);
+ }
+
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
" succeeded" :
" failed") + " without retry, exitStatus=" + event.getStatus());
@@ -287,8 +322,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public static boolean hasContainerFailed(ContainerStatus containerStatus) {
//Mark conainer as failed if we cant get its exit status i.e null?
- return containerStatus == null || containerStatus.getExitStatus() !=
- ContainerExitStatus.SUCCESS;
+ return containerStatus == null || containerStatus
+ .getExitStatus() != ContainerExitStatus.SUCCESS;
}
private static class ContainerStoppedTransition extends BaseTransition {
@@ -307,9 +342,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
ComponentInstanceEvent event) {
Component comp = compInstance.component;
- String containerDiag =
- compInstance.getCompInstanceId() + ": " + (failedBeforeLaunching ?
- FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics());
+ String containerDiag = compInstance.getCompInstanceId() + ": " + (
+ failedBeforeLaunching ?
+ FAILED_BEFORE_LAUNCH_DIAG :
+ event.getStatus().getDiagnostics());
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
@@ -329,36 +365,69 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// Check if it exceeds the failure threshold, but only if health threshold
// monitor is not enabled
if (!comp.isHealthThresholdMonitorEnabled()
- && comp.currentContainerFailure
- .get() > comp.maxContainerFailurePerComp) {
+ && comp.currentContainerFailure.get()
+ > comp.maxContainerFailurePerComp) {
String exitDiag = MessageFormat.format(
- "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... "
- + System.lineSeparator(),
- comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
+ "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. "
+ + "Shutting down now... "
+ + System.lineSeparator(), comp.getName(),
+ comp.currentContainerFailure.get(),
+ comp.maxContainerFailurePerComp);
compInstance.diagnostics.append(exitDiag);
// append to global diagnostics that will be reported to RM.
scheduler.getDiagnostics().append(containerDiag);
scheduler.getDiagnostics().append(exitDiag);
LOG.warn(exitDiag);
+
+ compInstance.getContainerSpec().setState(ContainerState.FAILED);
+ comp.getComponentSpec().setState(ComponentState.FAILED);
+ comp.getScheduler().getApp().setState(ServiceState.FAILED);
+
+ if (compInstance.timelineServiceEnabled) {
+ // record in ATS
+ compInstance.scheduler.getServiceTimelinePublisher()
+ .componentInstanceFinished(compInstance.getContainer().getId(),
+ failedBeforeLaunching ?
+ -1 :
+ event.getStatus().getExitStatus(), ContainerState.FAILED,
+ containerDiag);
+
+ // mark other component-instances/containers as STOPPED
+ for (ContainerId containerId : scheduler.getLiveInstances()
+ .keySet()) {
+ if (!compInstance.container.getId().equals(containerId)
+ && !isFinalState(compInstance.getContainerSpec().getState())) {
+ compInstance.getContainerSpec().setState(ContainerState.STOPPED);
+ compInstance.scheduler.getServiceTimelinePublisher()
+ .componentInstanceFinished(containerId,
+ KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED,
+ scheduler.getDiagnostics().toString());
+ }
+ }
+
+ compInstance.scheduler.getServiceTimelinePublisher()
+ .componentFinished(comp.getComponentSpec(), ComponentState.FAILED,
+ scheduler.getSystemClock().getTime());
+
+ compInstance.scheduler.getServiceTimelinePublisher()
+ .serviceAttemptUnregistered(comp.getContext(),
+ FinalApplicationStatus.FAILED,
+ scheduler.getDiagnostics().toString());
+ }
+
shouldFailService = true;
}
if (!failedBeforeLaunching) {
// clean up registry
- // If the container failed before launching, no need to cleanup registry,
+ // If the container failed before launching, no need to cleanup
+ // registry,
// because it was not registered before.
- // hdfs dir content will be overwritten when a new container gets started,
+ // hdfs dir content will be overwritten when a new container gets
+ // started,
// so no need remove.
- compInstance.scheduler.executorService
- .submit(() -> compInstance.cleanupRegistry(event.getContainerId()));
-
- if (compInstance.timelineServiceEnabled) {
- // record in ATS
- compInstance.serviceTimelinePublisher
- .componentInstanceFinished(event.getContainerId(),
- event.getStatus().getExitStatus(), containerDiag);
- }
- compInstance.containerSpec.setState(ContainerState.STOPPED);
+ compInstance.scheduler.executorService.submit(
+ () -> compInstance.cleanupRegistry(event.getContainerId()));
}
// remove the failed ContainerId -> CompInstance mapping
@@ -367,7 +436,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// According to component restart policy, handle container restart
// or finish the service (if all components finished)
handleComponentInstanceRelaunch(compInstance, event,
- failedBeforeLaunching);
+ failedBeforeLaunching, containerDiag);
if (shouldFailService) {
scheduler.getTerminationHandler().terminate(-1);
@@ -375,6 +444,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
+ public static boolean isFinalState(ContainerState state) {
+ return ContainerState.FAILED.equals(state) || ContainerState.STOPPED
+ .equals(state) || ContainerState.SUCCEEDED.equals(state);
+ }
+
private static class ContainerUpgradeTransition extends BaseTransition {
@Override
@@ -586,7 +660,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceFinished(containerId,
- KILLED_BY_APPMASTER, diagnostics.toString());
+ KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString());
}
cancelContainerStatusRetriever();
scheduler.executorService.submit(() ->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
index 6c3428a..832dad7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
@@ -32,5 +32,8 @@ public enum ServiceTimelineEvent {
COMPONENT_INSTANCE_IP_HOST_UPDATE,
- COMPONENT_INSTANCE_BECOME_READY
+ COMPONENT_INSTANCE_BECOME_READY,
+
+ COMPONENT_FINISHED
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
index 6c73ebb..79f37c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.*;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
@@ -42,7 +44,6 @@ import java.util.Map.Entry;
import java.util.Set;
import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY;
-import static org.apache.hadoop.yarn.service.api.records.ContainerState.STOPPED;
import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
/**
@@ -130,12 +131,11 @@ public class ServiceTimelinePublisher extends CompositeService {
}
public void serviceAttemptUnregistered(ServiceContext context,
- String diagnostics) {
+ FinalApplicationStatus status, String diagnostics) {
TimelineEntity entity = createServiceAttemptEntity(
context.attemptId.getApplicationId().toString());
Map<String, Object> entityInfos = new HashMap<String, Object>();
- entityInfos.put(ServiceTimelineMetricsConstants.STATE,
- FinalApplicationStatus.ENDED);
+ entityInfos.put(ServiceTimelineMetricsConstants.STATE, status);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entity.addInfo(entityInfos);
@@ -180,7 +180,7 @@ public class ServiceTimelinePublisher extends CompositeService {
}
public void componentInstanceFinished(ContainerId containerId,
- int exitCode, String diagnostics) {
+ int exitCode, ContainerState state, String diagnostics) {
TimelineEntity entity = createComponentInstanceEntity(
containerId.toString());
@@ -189,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService {
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
exitCode);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
- entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED);
+ entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
entity.addInfo(entityInfos);
// add an event
@@ -375,4 +375,25 @@ public class ServiceTimelinePublisher extends CompositeService {
log.error("Error when publishing entity " + entity, e);
}
}
+
+ public void componentFinished(
+ Component comp,
+ ComponentState state, long finishTime) {
+ createComponentEntity(comp.getName());
+ TimelineEntity entity = createComponentEntity(comp.getName());
+
+ // create info keys
+ Map<String, Object> entityInfos = new HashMap<String, Object>();
+ entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
+ entity.addInfo(entityInfos);
+
+ // add an event
+ TimelineEvent startEvent = new TimelineEvent();
+ startEvent
+ .setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString());
+ startEvent.setTimestamp(finishTime);
+ entity.addEvent(startEvent);
+
+ putEntity(entity);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
index 89888c5..321b2cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.mockito.stubbing.Answer;
import java.io.IOException;
@@ -92,7 +93,18 @@ public class MockRunningServiceContext extends ServiceContext {
public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService;
}
+
+ @Override public ServiceUtils.ProcessTerminationHandler
+ getTerminationHandler() {
+ return new
+ ServiceUtils.ProcessTerminationHandler() {
+ public void terminate(int exitCode) {
+ }
+ };
+ }
};
+
+
this.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(this);
@@ -116,8 +128,10 @@ public class MockRunningServiceContext extends ServiceContext {
Component component = new org.apache.hadoop.yarn.service.component.
Component(componentSpec, 1L, context);
componentState.put(component.getName(), component);
- component.handle(new ComponentEvent(component.getName(),
- ComponentEventType.FLEX));
+ component.handle(
+ new ComponentEvent(component.getName(), ComponentEventType.FLEX)
+ .setDesired(
+ component.getComponentSpec().getNumberOfContainers()));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
counter++;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 170c20b..6b49ab0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -63,7 +63,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -119,14 +118,10 @@ public class ServiceTestUtils {
Component.RestartPolicyEnum.NEVER, null));
exampleApp.addComponent(
createComponent("terminating-comp2", 2, "sleep 1000",
- Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
- add("terminating-comp1");
- }}));
+ Component.RestartPolicyEnum.ON_FAILURE, null));
exampleApp.addComponent(
createComponent("terminating-comp3", 2, "sleep 1000",
- Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
- add("terminating-comp2");
- }}));
+ Component.RestartPolicyEnum.ON_FAILURE, null));
return exampleApp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
index 2e17c7f..e1a4d9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@@ -147,7 +148,8 @@ public class TestComponent {
}
@Test
- public void testComponentStateUpdatesWithTerminatingComponents() throws
+ public void testComponentStateReachesStableStateWithTerminatingComponents()
+ throws
Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents";
@@ -198,6 +200,57 @@ public class TestComponent {
}
}
+ @Test
+ public void testComponentStateUpdatesWithTerminatingComponents()
+ throws
+ Exception {
+ final String serviceName =
+ "testComponentStateUpdatesWithTerminatingComponents";
+
+ Service testService = ServiceTestUtils.createTerminatingJobExample(
+ serviceName);
+ TestServiceManager.createDef(serviceName, testService);
+
+ ServiceContext context = new MockRunningServiceContext(rule, testService);
+
+ for (Component comp : context.scheduler.getAllComponents().values()) {
+ Iterator<ComponentInstance> instanceIter = comp.
+ getAllComponentInstances().iterator();
+
+ while (instanceIter.hasNext()) {
+
+ ComponentInstance componentInstance = instanceIter.next();
+ Container instanceContainer = componentInstance.getContainer();
+
+ //stop 1 container
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ instanceContainer.getId(),
+ org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+ "successful", 0);
+ comp.handle(new ComponentEvent(comp.getName(),
+ ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
+ .setContainerId(instanceContainer.getId()));
+ componentInstance.handle(
+ new ComponentInstanceEvent(componentInstance.getContainer().getId(),
+ ComponentInstanceEventType.STOP).setStatus(containerStatus));
+ }
+
+ ComponentState componentState =
+ comp.getComponentSpec().getState();
+ Assert.assertEquals(
+ ComponentState.SUCCEEDED,
+ componentState);
+ }
+
+ ServiceState serviceState =
+ testService.getState();
+ Assert.assertEquals(
+ ServiceState.SUCCEEDED,
+ serviceState);
+ }
+
+
+
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
index 60f5c91..03158cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java
@@ -110,7 +110,6 @@ public class TestComponentRestartPolicy {
assertEquals(true, restartPolicy.isReadyForDownStream(component));
-
when(component.getNumSucceededInstances()).thenReturn(new Long(2));
when(component.getNumFailedInstances()).thenReturn(new Long(1));
when(component.getNumDesiredInstances()).thenReturn(3);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
index f428904..e039981 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
@@ -204,6 +204,8 @@ public class TestComponentInstance {
when(componentInstance.getComponent()).thenReturn(component);
when(componentInstance.getCompInstanceName()).thenReturn(
"compInstance" + instanceId);
+ Container container = mock(Container.class);
+ when(componentInstance.getContainerSpec()).thenReturn(container);
ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
ServiceUtils.ProcessTerminationHandler.class);
@@ -227,12 +229,15 @@ public class TestComponentInstance {
Mockito.doNothing().when(serviceScheduler).setGracefulStop(
any(FinalApplicationStatus.class));
+ final String containerDiag = "Container succeeded";
+
ComponentInstanceEvent componentInstanceEvent = mock(
ComponentInstanceEvent.class);
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
- org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0);
+ org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+ containerDiag, 0);
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
@@ -245,7 +250,7 @@ public class TestComponentInstance {
comp.getAllComponentInstances().iterator().next();
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
@@ -262,7 +267,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
@@ -286,7 +291,7 @@ public class TestComponentInstance {
when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
@@ -304,8 +309,7 @@ public class TestComponentInstance {
when(comp.getNumFailedInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
-
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
@@ -323,7 +327,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
@@ -340,7 +344,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
@@ -363,8 +367,7 @@ public class TestComponentInstance {
containerStatus.setExitStatus(1);
ComponentInstance commponentInstance = iter.next();
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
- componentInstanceEvent, false);
-
+ componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
@@ -404,7 +407,7 @@ public class TestComponentInstance {
when(component2Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
Map<String, ComponentInstance> failed1Instances = new HashMap<>();
@@ -418,7 +421,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
@@ -458,7 +461,7 @@ public class TestComponentInstance {
when(component2Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
@@ -471,7 +474,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
@@ -500,7 +503,7 @@ public class TestComponentInstance {
for (ComponentInstance component2Instance : component2Instances) {
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
succeeded1Instances = new HashMap<>();
@@ -511,7 +514,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getSucceededInstances())
.thenReturn(succeeded1Instances.values());
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
- componentInstanceEvent, false);
+ componentInstanceEvent, false, containerDiag);
}
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd089caf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
index cff7229..a77e6c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.timelineservice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -122,7 +123,8 @@ public class TestServiceTimelinePublisher {
context.attemptId = ApplicationAttemptId
.newInstance(ApplicationId.fromString(service.getId()), 1);
String exitDiags = "service killed";
- serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags);
+ serviceTimelinePublisher.serviceAttemptUnregistered(context,
+ FinalApplicationStatus.ENDED, exitDiags);
lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
for (TimelineEntity timelineEntity : lastPublishedEntities) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org