You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2020/07/09 07:11:12 UTC
[hadoop] branch trunk updated: YARN-10341. Yarn Service Container
Completed event doesn't get processed. Contributed by Bilwa S T.
This is an automated email from the ASF dual-hosted git repository.
brahma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfe6039 YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.
dfe6039 is described below
commit dfe60392c91be21f574c1659af22f5c381b2675a
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Thu Jul 9 12:34:52 2020 +0530
YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.
---
.../hadoop/yarn/service/ServiceScheduler.java | 2 +-
.../apache/hadoop/yarn/service/TestServiceAM.java | 88 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 1 deletion(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 458a7a1..0d77479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -737,7 +737,7 @@ public class ServiceScheduler extends CompositeService {
LOG.warn(
"Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
containerId, status.getExitStatus(), status.getDiagnostics());
- return;
+ continue;
}
ComponentEvent event =
new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
index bbcbee2..5b961a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -22,22 +22,29 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
@@ -47,7 +54,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +72,8 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class TestServiceAM extends ServiceTestUtils{
@@ -72,6 +83,9 @@ public class TestServiceAM extends ServiceTestUtils{
private File basedir;
YarnConfiguration conf = new YarnConfiguration();
TestingCluster zkCluster;
+ @Rule
+ public ServiceTestUtils.ServiceFSWatcher rule =
+ new ServiceTestUtils.ServiceFSWatcher();
@Before
public void setup() throws Exception {
@@ -312,6 +326,80 @@ public class TestServiceAM extends ServiceTestUtils{
}
@Test
+ public void testContainerCompletedEventProcessed() throws Exception {
+ ServiceContext context = createServiceContext("abc");
+ MockServiceScheduler scheduler = new MockServiceScheduler(context);
+ scheduler.init(conf);
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 1);
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0);
+ ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1,
+ org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+ "successful", 0);
+ ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1);
+ ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2,
+ org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+ "successful", 0);
+ ComponentInstance instance = Mockito.mock(ComponentInstance.class);
+ Mockito.doReturn("componentInstance").when(instance).getCompName();
+ scheduler.addLiveCompInstance(containerId2, instance);
+ List<ContainerStatus> statuses = new ArrayList<>();
+ // First container instance will be null
+ statuses.add(containerStatus1);
+ // Second container instance is added
+ scheduler.addLiveCompInstance(containerId2, instance);
+ statuses.add(containerStatus2);
+ scheduler.callbackHandler.onContainersCompleted(statuses);
+ // For second container event should be dispatched.
+ verify(scheduler.dispatcher, times(1)).getEventHandler();
+ DefaultMetricsSystem.shutdown();
+ }
+
+ private ServiceContext createServiceContext(String name)
+ throws Exception {
+ Artifact artifact = new Artifact();
+ artifact.setId("1");
+ artifact.setType(Artifact.TypeEnum.TARBALL);
+ Service serviceDef = ServiceTestUtils.createExampleApplication();
+ ApplicationId applicationId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ serviceDef.setId(applicationId.toString());
+ serviceDef.setName(name);
+ serviceDef.setState(ServiceState.STARTED);
+ serviceDef.getComponents().forEach(component ->
+ component.setArtifact(artifact));
+ ServiceContext context = new MockRunningServiceContext(rule,
+ serviceDef);
+ context.scheduler.getDispatcher().setDrainEventsOnStop();
+ context.scheduler.getDispatcher().start();
+ return context;
+ }
+
+ class MockServiceScheduler extends ServiceScheduler {
+ private AsyncDispatcher dispatcher;
+ private AMRMClientCallback callbackHandler = new AMRMClientCallback();
+
+ MockServiceScheduler(ServiceContext context) {
+ super(context);
+ }
+
+ @Override
+ protected AsyncDispatcher createAsyncDispatcher() {
+ dispatcher = Mockito.mock(AsyncDispatcher.class);
+ EventHandler<Event> handler = Mockito.mock(EventHandler.class);
+ Mockito.doReturn(handler).when(dispatcher).getEventHandler();
+ return dispatcher;
+ }
+
+ @Override
+ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
+ return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
+ }
+
+ }
+
+ @Test
public void testRecordTokensForContainers() throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Service exampleApp = new Service();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org