You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2020/07/09 07:11:12 UTC

[hadoop] branch trunk updated: YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.

This is an automated email from the ASF dual-hosted git repository.

brahma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfe6039  YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.
dfe6039 is described below

commit dfe60392c91be21f574c1659af22f5c381b2675a
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Thu Jul 9 12:34:52 2020 +0530

    YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.
---
 .../hadoop/yarn/service/ServiceScheduler.java      |  2 +-
 .../apache/hadoop/yarn/service/TestServiceAM.java  | 88 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 458a7a1..0d77479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -737,7 +737,7 @@ public class ServiceScheduler extends CompositeService {
           LOG.warn(
               "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
               containerId, status.getExitStatus(), status.getDiagnostics());
-          return;
+          continue;
         }
         ComponentEvent event =
             new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
index bbcbee2..5b961a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -22,22 +22,29 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingCluster;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.ComponentState;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
@@ -47,7 +54,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +72,8 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class TestServiceAM extends ServiceTestUtils{
 
@@ -72,6 +83,9 @@ public class TestServiceAM extends ServiceTestUtils{
   private File basedir;
   YarnConfiguration conf = new YarnConfiguration();
   TestingCluster zkCluster;
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
 
   @Before
   public void setup() throws Exception {
@@ -312,6 +326,80 @@ public class TestServiceAM extends ServiceTestUtils{
   }
 
   @Test
+  public void testContainerCompletedEventProcessed() throws Exception {
+    ServiceContext context = createServiceContext("abc");
+    MockServiceScheduler scheduler = new MockServiceScheduler(context);
+    scheduler.init(conf);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0);
+    ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1,
+        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+        "successful", 0);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2,
+        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+        "successful", 0);
+    ComponentInstance instance = Mockito.mock(ComponentInstance.class);
+    Mockito.doReturn("componentInstance").when(instance).getCompName();
+    scheduler.addLiveCompInstance(containerId2, instance);
+    List<ContainerStatus> statuses = new ArrayList<>();
+    // First container instance will be null
+    statuses.add(containerStatus1);
+    // Second container instance is added
+    scheduler.addLiveCompInstance(containerId2, instance);
+    statuses.add(containerStatus2);
+    scheduler.callbackHandler.onContainersCompleted(statuses);
+    // For second container event should be dispatched.
+    verify(scheduler.dispatcher, times(1)).getEventHandler();
+    DefaultMetricsSystem.shutdown();
+  }
+
+  private ServiceContext createServiceContext(String name)
+      throws Exception {
+    Artifact artifact = new Artifact();
+    artifact.setId("1");
+    artifact.setType(Artifact.TypeEnum.TARBALL);
+    Service serviceDef = ServiceTestUtils.createExampleApplication();
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    serviceDef.setId(applicationId.toString());
+    serviceDef.setName(name);
+    serviceDef.setState(ServiceState.STARTED);
+    serviceDef.getComponents().forEach(component ->
+        component.setArtifact(artifact));
+    ServiceContext context = new MockRunningServiceContext(rule,
+        serviceDef);
+    context.scheduler.getDispatcher().setDrainEventsOnStop();
+    context.scheduler.getDispatcher().start();
+    return context;
+  }
+
+  class MockServiceScheduler extends ServiceScheduler {
+    private AsyncDispatcher dispatcher;
+    private AMRMClientCallback callbackHandler = new AMRMClientCallback();
+
+    MockServiceScheduler(ServiceContext context) {
+      super(context);
+    }
+
+    @Override
+    protected AsyncDispatcher createAsyncDispatcher() {
+      dispatcher = Mockito.mock(AsyncDispatcher.class);
+      EventHandler<Event> handler = Mockito.mock(EventHandler.class);
+      Mockito.doReturn(handler).when(dispatcher).getEventHandler();
+      return dispatcher;
+    }
+
+    @Override
+    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
+      return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
+    }
+
+  }
+
+  @Test
   public void testRecordTokensForContainers() throws Exception {
     ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
     Service exampleApp = new Service();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org