You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/07 00:46:24 UTC
[incubator-gobblin] branch master updated:
[Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion
method
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2228f27 [Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion method
2228f27 is described below
commit 2228f2783f760cac24e9c6a66e35ee517316504c
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Mar 6 16:46:17 2020 -0800
[Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion method
Closes #2911 from ZihanLi58/GOBBLIN-1069-new
---
.../java/org/apache/gobblin/yarn/YarnService.java | 60 ++++++++++------------
1 file changed, 27 insertions(+), 33 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index ff1f3ba..6476c99 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -620,11 +620,7 @@ public class YarnService extends AbstractIdleService {
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
- if (completedContainerEntry == null) {
- //No map for this container means we don't maintain this container, directly return
- return;
- }
- String completedInstanceName = completedContainerEntry.getValue();
+ String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
@@ -643,42 +639,40 @@ public class YarnService extends AbstractIdleService {
if (this.shutdownInProgress) {
return;
}
+ if(completedContainerEntry != null) {
+ this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
+ int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
- this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
- int retryCount =
- this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
-
- // Populate event metadata
- Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
- if (this.eventSubmitter.isPresent()) {
- eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
- eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
- eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
- }
-
- if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
+ // Populate event metadata
+ Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
if (this.eventSubmitter.isPresent()) {
- this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
- eventMetadataBuilder.get().build());
+ eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
+ eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
+ eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
}
- LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
- return;
- }
+ if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+ .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+ }
- // Add the Helix instance name of the completed container to the queue of unused
- // instance names so they can be reused by a replacement container.
- this.unusedHelixInstanceNames.offer(completedInstanceName);
+ LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
+ return;
+ }
- if (this.eventSubmitter.isPresent()) {
- this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
- eventMetadataBuilder.get().build());
- }
+ // Add the Helix instance name of the completed container to the queue of unused
+ // instance names so they can be reused by a replacement container.
+ this.unusedHelixInstanceNames.offer(completedInstanceName);
- LOGGER.info(String.format("Requesting a new container to replace %s to run Helix instance %s",
- containerStatus.getContainerId(), completedInstanceName));
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+ .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+ }
+ }
+ LOGGER.info(String.format("Requesting a new container to replace %s to run Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) ?
+ shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerEntry != null ?
Optional.of(completedContainerEntry.getKey()) : Optional.<Container>absent()));
}