You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/07 00:46:24 UTC

[incubator-gobblin] branch master updated: [Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion method

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2228f27  [Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion method
2228f27 is described below

commit 2228f2783f760cac24e9c6a66e35ee517316504c
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Mar 6 16:46:17 2020 -0800

    [Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion method
    
    Closes #2911 from ZihanLi58/GOBBLIN-1069-new
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 60 ++++++++++------------
 1 file changed, 27 insertions(+), 33 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index ff1f3ba..6476c99 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -620,11 +620,7 @@ public class YarnService extends AbstractIdleService {
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
-    if (completedContainerEntry == null) {
-      //No map for this container means we don't maintain this container, directly return
-      return;
-    }
-    String completedInstanceName = completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
 
     LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
@@ -643,42 +639,40 @@ public class YarnService extends AbstractIdleService {
     if (this.shutdownInProgress) {
       return;
     }
+    if(completedContainerEntry != null) {
+      this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
+      int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
 
-    this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
-    int retryCount =
-    	 this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
-
-    // Populate event metadata
-    Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
-    if (this.eventSubmitter.isPresent()) {
-      eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
-      eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
-      eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
-    }
-
-    if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
+      // Populate event metadata
+      Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
       if (this.eventSubmitter.isPresent()) {
-        this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
-            eventMetadataBuilder.get().build());
+        eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
+        eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
+        eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
       }
 
-      LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
-      return;
-    }
+      if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
+        if (this.eventSubmitter.isPresent()) {
+          this.eventSubmitter.get()
+              .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+        }
 
-    // Add the Helix instance name of the completed container to the queue of unused
-    // instance names so they can be reused by a replacement container.
-    this.unusedHelixInstanceNames.offer(completedInstanceName);
+        LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
+        return;
+      }
 
-    if (this.eventSubmitter.isPresent()) {
-      this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
-          eventMetadataBuilder.get().build());
-    }
+      // Add the Helix instance name of the completed container to the queue of unused
+      // instance names so they can be reused by a replacement container.
+      this.unusedHelixInstanceNames.offer(completedInstanceName);
 
-    LOGGER.info(String.format("Requesting a new container to replace %s to run Helix instance %s",
-        containerStatus.getContainerId(), completedInstanceName));
+      if (this.eventSubmitter.isPresent()) {
+        this.eventSubmitter.get()
+            .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+      }
+    }
+    LOGGER.info(String.format("Requesting a new container to replace %s to run Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
     this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) ?
+        shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerEntry != null ?
             Optional.of(completedContainerEntry.getKey()) : Optional.<Container>absent()));
   }