You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/07/19 05:21:42 UTC

[apex-core] branch master updated: APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.

This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 0413f9b  APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.
0413f9b is described below

commit 0413f9b5bd19a2e5a46c565a7815f1e5c2f7a57f
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Mon Jun 5 17:24:03 2017 -0700

    APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.
---
 .../stram/StreamingAppMasterService.java           | 539 ++++++++++-----------
 .../java/com/datatorrent/stram/cli/ApexCli.java    |   6 +-
 .../com/datatorrent/stram/client/StramAgent.java   |   8 +-
 .../datatorrent/stram/client/StramClientUtils.java |   8 +
 .../datatorrent/stram/security/StramUserLogin.java |  14 +-
 .../apache/apex/engine/YarnAppLauncherImpl.java    |  30 +-
 6 files changed, 300 insertions(+), 305 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 5030a32..63080bb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -732,269 +732,265 @@ public class StreamingAppMasterService extends CompositeService
     ResourceRequestHandler resourceRequestor = System.getenv().containsKey("CDH_HADOOP_BIN") ? new BlacklistBasedResourceRequestHandler() : new ResourceRequestHandler();
 
     List<ContainerStartRequest> pendingContainerStartRequests = new LinkedList<>();
-    YarnClient clientRMService = YarnClient.createYarnClient();
+    try (YarnClient clientRMService = StramClientUtils.createYarnClient(conf)) {
 
-    try {
-      // YARN-435
-      // we need getClusterNodes to populate the initial node list,
-      // subsequent updates come through the heartbeat response
-      clientRMService.init(conf);
-      clientRMService.start();
-
-      ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID));
-      if (ar != null) {
-        appDone = true;
-        dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
-            ar.getApplicationId().toString(), ar.getName(), ar.getUser());
-        LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
-        finishApplication(FinalApplicationStatus.FAILED);
-        return;
+      try {
+        // YARN-435
+        // we need getClusterNodes to populate the initial node list,
+        // subsequent updates come through the heartbeat response
+
+        ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID));
+        if (ar != null) {
+          appDone = true;
+          dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
+              ar.getApplicationId().toString(), ar.getName(), ar.getUser());
+          LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
+          finishApplication(FinalApplicationStatus.FAILED);
+          return;
+        }
+        resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
+        nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL;
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
       }
-      resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
-      nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL;
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
-    } finally {
-      clientRMService.stop();
-    }
 
-    List<Container> containers = response.getContainersFromPreviousAttempts();
+      List<Container> containers = response.getContainersFromPreviousAttempts();
 
-    // Running containers might take a while to register with the new app master and send the heartbeat signal.
-    int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
+      // Running containers might take a while to register with the new app master and send the heartbeat signal.
+      int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
 
-    List<ContainerId> releasedContainers = previouslyAllocatedContainers(containers);
-    FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
-    final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_PORT);
+      List<ContainerId> releasedContainers = previouslyAllocatedContainers(containers);
+      FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+      final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_PORT);
 
-    while (!appDone) {
-      loopCounter++;
-      final long currentTimeMillis = System.currentTimeMillis();
+      while (!appDone) {
+        loopCounter++;
+        final long currentTimeMillis = System.currentTimeMillis();
 
-      if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
-        String applicationId = appAttemptID.getApplicationId().toString();
-        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
-      }
+        if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
+          String applicationId = appAttemptID.getApplicationId().toString();
+          expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
+        }
 
-      if (currentTimeMillis > nodeReportUpdateTime) {
-        resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
-        nodeReportUpdateTime = currentTimeMillis + UPDATE_NODE_REPORTS_INTERVAL;
-      }
+        if (currentTimeMillis > nodeReportUpdateTime) {
+          resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
+          nodeReportUpdateTime = currentTimeMillis + UPDATE_NODE_REPORTS_INTERVAL;
+        }
 
-      Runnable r;
-      while ((r = this.pendingTasks.poll()) != null) {
-        r.run();
-      }
+        Runnable r;
+        while ((r = this.pendingTasks.poll()) != null) {
+          r.run();
+        }
 
-      // log current state
-      /*
-       * LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" +
-       * numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers +
-       * ", failed=" + numFailedContainers + ", currentAllocated=" + this.allAllocatedContainers.size());
-       */
-      // Sleep before each loop when asking RM for containers
-      // to avoid flooding RM with spurious requests when it
-      // need not have any available containers
-      try {
-        sleep(1000);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted " + e.getMessage());
-      }
+        // log current state
+        /*
+         * LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" +
+         * numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers +
+         * ", failed=" + numFailedContainers + ", currentAllocated=" + this.allAllocatedContainers.size());
+         */
+        // Sleep before each loop when asking RM for containers
+        // to avoid flooding RM with spurious requests when it
+        // need not have any available containers
+        try {
+          sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep interrupted", e);
+        }
 
-      // Setup request to be sent to RM to allocate containers
-      List<ContainerRequest> containerRequests = new ArrayList<>();
-      List<ContainerRequest> removedContainerRequests = new ArrayList<>();
-
-      // request containers for pending deploy requests
-      if (!dnmgr.containerStartRequests.isEmpty()) {
-        StreamingContainerAgent.ContainerStartRequest csr;
-        while ((csr = dnmgr.containerStartRequests.poll()) != null) {
-          if (csr.container.getRequiredMemoryMB() > maxMem) {
-            LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
-            csr.container.setRequiredMemoryMB(maxMem);
-          }
-          if (csr.container.getRequiredMemoryMB() < minMem) {
-            csr.container.setRequiredMemoryMB(minMem);
-          }
-          if (csr.container.getRequiredVCores() > maxVcores) {
-            LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", csr.container.getRequiredVCores(), maxVcores);
-            csr.container.setRequiredVCores(maxVcores);
-          }
-          if (csr.container.getRequiredVCores() < minVcores) {
-            csr.container.setRequiredVCores(minVcores);
-          }
-          csr.container.setResourceRequestPriority(nextRequestPriority++);
-          ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
-          if (cr == null) {
-            pendingContainerStartRequests.add(csr);
-          } else {
-            resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+        // Setup request to be sent to RM to allocate containers
+        List<ContainerRequest> containerRequests = new ArrayList<>();
+        List<ContainerRequest> removedContainerRequests = new ArrayList<>();
+
+        // request containers for pending deploy requests
+        if (!dnmgr.containerStartRequests.isEmpty()) {
+          StreamingContainerAgent.ContainerStartRequest csr;
+          while ((csr = dnmgr.containerStartRequests.poll()) != null) {
+            if (csr.container.getRequiredMemoryMB() > maxMem) {
+              LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
+              csr.container.setRequiredMemoryMB(maxMem);
+            }
+            if (csr.container.getRequiredMemoryMB() < minMem) {
+              csr.container.setRequiredMemoryMB(minMem);
+            }
+            if (csr.container.getRequiredVCores() > maxVcores) {
+              LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", csr.container.getRequiredVCores(), maxVcores);
+              csr.container.setRequiredVCores(maxVcores);
+            }
+            if (csr.container.getRequiredVCores() < minVcores) {
+              csr.container.setRequiredVCores(minVcores);
+            }
+            csr.container.setResourceRequestPriority(nextRequestPriority++);
+            ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
+            if (cr == null) {
+              pendingContainerStartRequests.add(csr);
+            } else {
+              resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+            }
           }
         }
-      }
 
-      // If all other requests are allocated, retry pending requests which need host availability
-      if (containerRequests.isEmpty() && !pendingContainerStartRequests.isEmpty()) {
-        List<ContainerStartRequest> removalList = new LinkedList<>();
-        for (ContainerStartRequest csr : pendingContainerStartRequests) {
-          ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
-          if (cr != null) {
-            resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
-            removalList.add(csr);
+        // If all other requests are allocated, retry pending requests which need host availability
+        if (containerRequests.isEmpty() && !pendingContainerStartRequests.isEmpty()) {
+          List<ContainerStartRequest> removalList = new LinkedList<>();
+          for (ContainerStartRequest csr : pendingContainerStartRequests) {
+            ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
+            if (cr != null) {
+              resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+              removalList.add(csr);
+            }
           }
+          pendingContainerStartRequests.removeAll(removalList);
         }
-        pendingContainerStartRequests.removeAll(removalList);
-      }
 
-      resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
+        resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
 
       /* Remove nodes from blacklist after timeout */
-      List<String> blacklistRemovals = new ArrayList<>();
-      for (String hostname : failedBlackListedNodes) {
-        Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
-        if (timeDiff >= blacklistRemovalTime) {
-          blacklistRemovals.add(hostname);
-          failedContainerNodesMap.remove(hostname);
+        List<String> blacklistRemovals = new ArrayList<>();
+        for (String hostname : failedBlackListedNodes) {
+          Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
+          if (timeDiff >= blacklistRemovalTime) {
+            blacklistRemovals.add(hostname);
+            failedContainerNodesMap.remove(hostname);
+          }
+        }
+        if (!blacklistRemovals.isEmpty()) {
+          amRmClient.updateBlacklist(null, blacklistRemovals);
+          LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString());
+          failedBlackListedNodes.removeAll(blacklistRemovals);
         }
-      }
-      if (!blacklistRemovals.isEmpty()) {
-        amRmClient.updateBlacklist(null, blacklistRemovals);
-        LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString());
-        failedBlackListedNodes.removeAll(blacklistRemovals);
-      }
 
-      numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
-      AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
-      if (amResp.getAMCommand() != null) {
-        LOG.info(" statement executed:{}", amResp.getAMCommand());
-        switch (amResp.getAMCommand()) {
-          case AM_RESYNC:
-          case AM_SHUTDOWN:
-            throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
-          default:
-            throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
+        numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
+        AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
+        if (amResp.getAMCommand() != null) {
+          LOG.info(" statement executed:{}", amResp.getAMCommand());
+          switch (amResp.getAMCommand()) {
+            case AM_RESYNC:
+            case AM_SHUTDOWN:
+              throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
+            default:
+              throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
 
-        }
-      }
-      releasedContainers.clear();
-
-      // Retrieve list of allocated containers from the response
-      List<Container> newAllocatedContainers = amResp.getAllocatedContainers();
-      // LOG.info("Got response from RM for container ask, allocatedCnt=" + newAllocatedContainers.size());
-      numRequestedContainers -= newAllocatedContainers.size();
-      long timestamp = System.currentTimeMillis();
-      for (Container allocatedContainer : newAllocatedContainers) {
-
-        LOG.info("Got new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
-        // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
-
-        boolean alreadyAllocated = true;
-        StreamingContainerAgent.ContainerStartRequest csr = null;
-        for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
-          if (entry.getKey().container.getResourceRequestPriority() == allocatedContainer.getPriority().getPriority()) {
-            alreadyAllocated = false;
-            csr = entry.getKey();
-            break;
           }
         }
+        releasedContainers.clear();
+
+        // Retrieve list of allocated containers from the response
+        List<Container> newAllocatedContainers = amResp.getAllocatedContainers();
+        // LOG.info("Got response from RM for container ask, allocatedCnt=" + newAllocatedContainers.size());
+        numRequestedContainers -= newAllocatedContainers.size();
+        long timestamp = System.currentTimeMillis();
+        for (Container allocatedContainer : newAllocatedContainers) {
+
+          LOG.info("Got new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
+          // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+
+          boolean alreadyAllocated = true;
+          StreamingContainerAgent.ContainerStartRequest csr = null;
+          for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
+            if (entry.getKey().container.getResourceRequestPriority() == allocatedContainer.getPriority().getPriority()) {
+              alreadyAllocated = false;
+              csr = entry.getKey();
+              break;
+            }
+          }
 
-        if (alreadyAllocated) {
-          LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
-          releasedContainers.add(allocatedContainer.getId());
-          numReleasedContainers++;
-          numRequestedContainers--;
-          continue;
-        }
-        if (csr != null) {
-          requestedResources.remove(csr);
-        }
+          if (alreadyAllocated) {
+            LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
+            releasedContainers.add(allocatedContainer.getId());
+            numReleasedContainers++;
+            numRequestedContainers--;
+            continue;
+          }
+          if (csr != null) {
+            requestedResources.remove(csr);
+          }
 
-        // allocate resource to container
-        ContainerResource resource = new ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress());
-        StreamingContainerAgent sca = dnmgr.assignContainer(resource, null);
+          // allocate resource to container
+          ContainerResource resource = new ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress());
+          StreamingContainerAgent sca = dnmgr.assignContainer(resource, null);
 
-        if (sca == null) {
-          // allocated container no longer needed, add release request
-          LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
-          releasedContainers.add(allocatedContainer.getId());
-        } else {
-          AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
-          this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
-          ByteBuffer tokens = null;
-          if (UserGroupInformation.isSecurityEnabled()) {
-            UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-            Token<StramDelegationTokenIdentifier> delegationToken = allocateDelegationToken(ugi.getUserName(), heartbeatListener.getAddress());
-            allocatedContainerHolder.delegationToken = delegationToken;
-            //ByteBuffer tokens = LaunchContainerRunnable.getTokens(delegationTokenManager, heartbeatListener.getAddress());
-            tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
+          if (sca == null) {
+            // allocated container no longer needed, add release request
+            LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
+            releasedContainers.add(allocatedContainer.getId());
+          } else {
+            AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
+            this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
+            ByteBuffer tokens = null;
+            if (UserGroupInformation.isSecurityEnabled()) {
+              UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+              Token<StramDelegationTokenIdentifier> delegationToken = allocateDelegationToken(ugi.getUserName(), heartbeatListener.getAddress());
+              allocatedContainerHolder.delegationToken = delegationToken;
+              //ByteBuffer tokens = LaunchContainerRunnable.getTokens(delegationTokenManager, heartbeatListener.getAddress());
+              tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
+            }
+            LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, nmClient, sca, tokens);
+            // Thread launchThread = new Thread(runnableLaunchContainer);
+            // launchThreads.add(launchThread);
+            // launchThread.start();
+            launchContainer.run(); // communication with NMs is now async
+
+            // record container start event
+            StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
+                allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
+            ev.setTimestamp(timestamp);
+            dnmgr.recordEventAsync(ev);
           }
-          LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, nmClient, sca, tokens);
-          // Thread launchThread = new Thread(runnableLaunchContainer);
-          // launchThreads.add(launchThread);
-          // launchThread.start();
-          launchContainer.run(); // communication with NMs is now async
-
-          // record container start event
-          StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
-              allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
-          ev.setTimestamp(timestamp);
-          dnmgr.recordEventAsync(ev);
         }
-      }
 
-      // track node updates for future locality constraint allocations
-      // TODO: it seems 2.0.4-alpha doesn't give us any updates
-      resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
+        // track node updates for future locality constraint allocations
+        // TODO: it seems 2.0.4-alpha doesn't give us any updates
+        resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
 
-      // Check the completed containers
-      List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
-      // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
-      List<String> blacklistAdditions = new ArrayList<>();
-      for (ContainerStatus containerStatus : completedContainers) {
-        LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
+        // Check the completed containers
+        List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
+        // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+        List<String> blacklistAdditions = new ArrayList<>();
+        for (ContainerStatus containerStatus : completedContainers) {
+          LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
 
-        // non complete containers should not be here
-        assert (containerStatus.getState() == ContainerState.COMPLETE);
+          // non complete containers should not be here
+          assert (containerStatus.getState() == ContainerState.COMPLETE);
 
-        AllocatedContainer allocatedContainer = allocatedContainers.remove(containerStatus.getContainerId().toString());
-        if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
-          UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-          delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
-        }
-        EventGroupId groupId = null;
-        int exitStatus = containerStatus.getExitStatus();
-        if (0 != exitStatus) {
-          if (allocatedContainer != null) {
-            numFailedContainers.incrementAndGet();
-            if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) {
-              // If container failure due to framework
-              String hostname = allocatedContainer.container.getNodeId().getHost();
-              if (!failedBlackListedNodes.contains(hostname)) {
-                // Blacklist the node if not already blacklisted
-                if (failedContainerNodesMap.containsKey(hostname)) {
-                  NodeFailureStats stats = failedContainerNodesMap.get(hostname);
-                  long timeStamp = System.currentTimeMillis();
-                  if (timeStamp - stats.lastFailureTimeStamp  >= blacklistRemovalTime) {
-                    // Reset failure count if last failure was before Blacklist removal time
-                    stats.failureCount = 1;
-                    stats.lastFailureTimeStamp = timeStamp;
-                  } else {
-                    stats.lastFailureTimeStamp = timeStamp;
-                    stats.failureCount++;
-                    if (stats.failureCount >= maxConsecutiveContainerFailures) {
-                      LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000));
-                      blacklistAdditions.add(hostname);
-                      failedBlackListedNodes.add(hostname);
+          AllocatedContainer allocatedContainer = allocatedContainers.remove(containerStatus.getContainerId().toString());
+          if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
+            UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+            delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
+          }
+          EventGroupId groupId = null;
+          int exitStatus = containerStatus.getExitStatus();
+          if (0 != exitStatus) {
+            if (allocatedContainer != null) {
+              numFailedContainers.incrementAndGet();
+              if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) {
+                // If container failure due to framework
+                String hostname = allocatedContainer.container.getNodeId().getHost();
+                if (!failedBlackListedNodes.contains(hostname)) {
+                  // Blacklist the node if not already blacklisted
+                  if (failedContainerNodesMap.containsKey(hostname)) {
+                    NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+                    long timeStamp = System.currentTimeMillis();
+                    if (timeStamp - stats.lastFailureTimeStamp >= blacklistRemovalTime) {
+                      // Reset failure count if last failure was before Blacklist removal time
+                      stats.failureCount = 1;
+                      stats.lastFailureTimeStamp = timeStamp;
+                    } else {
+                      stats.lastFailureTimeStamp = timeStamp;
+                      stats.failureCount++;
+                      if (stats.failureCount >= maxConsecutiveContainerFailures) {
+                        LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000));
+                        blacklistAdditions.add(hostname);
+                        failedBlackListedNodes.add(hostname);
+                      }
                     }
+                  } else {
+                    failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1));
                   }
-                } else {
-                  failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1));
                 }
               }
             }
-          }
 //          if (exitStatus == 1) {
 //            // non-recoverable StreamingContainer failure
 //            appDone = true;
@@ -1003,62 +999,63 @@ public class StreamingAppMasterService extends CompositeService
 //            LOG.info("Exiting due to: {}", dnmgr.shutdownDiagnosticsMessage);
 //          }
 //          else {
-          // Recoverable failure or process killed (externally or via stop request by AM)
-          // also occurs when a container was released by the application but never assigned/launched
-          LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
-          String containerIdStr = containerStatus.getContainerId().toString();
-          dnmgr.scheduleContainerRestart(containerIdStr);
-          groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
+            // Recoverable failure or process killed (externally or via stop request by AM)
+            // also occurs when a container was released by the application but never assigned/launched
+            LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
+            String containerIdStr = containerStatus.getContainerId().toString();
+            dnmgr.scheduleContainerRestart(containerIdStr);
+            groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
 //          }
-        } else {
-          // container completed successfully
-          numCompletedContainers.incrementAndGet();
-          LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
-          // Reset counter for node failure, if exists
-          String hostname = allocatedContainer.container.getNodeId().getHost();
-          NodeFailureStats stats = failedContainerNodesMap.get(hostname);
-          if (stats != null) {
-            stats.failureCount = 0;
+          } else {
+            // container completed successfully
+            numCompletedContainers.incrementAndGet();
+            LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+            // Reset counter for node failure, if exists
+            String hostname = allocatedContainer.container.getNodeId().getHost();
+            NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+            if (stats != null) {
+              stats.failureCount = 0;
+            }
           }
-        }
 
-        String containerIdStr = containerStatus.getContainerId().toString();
-        dnmgr.removeContainerAgent(containerIdStr);
+          String containerIdStr = containerStatus.getContainerId().toString();
+          dnmgr.removeContainerAgent(containerIdStr);
 
-        // record container stop event
-        StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId);
-        ev.setReason(containerStatus.getDiagnostics());
-        dnmgr.recordEventAsync(ev);
-      }
+          // record container stop event
+          StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId);
+          ev.setReason(containerStatus.getDiagnostics());
+          dnmgr.recordEventAsync(ev);
+        }
 
-      if (!blacklistAdditions.isEmpty()) {
-        amRmClient.updateBlacklist(blacklistAdditions, null);
-        long timeStamp = System.currentTimeMillis();
-        for (String hostname : blacklistAdditions) {
-          NodeFailureStats stats = failedContainerNodesMap.get(hostname);
-          stats.blackListAdditionTime = timeStamp;
+        if (!blacklistAdditions.isEmpty()) {
+          amRmClient.updateBlacklist(blacklistAdditions, null);
+          long timeStamp = System.currentTimeMillis();
+          for (String hostname : blacklistAdditions) {
+            NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+            stats.blackListAdditionTime = timeStamp;
+          }
+        }
+        if (dnmgr.forcedShutdown) {
+          LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
+          finalStatus = FinalApplicationStatus.FAILED;
+          appDone = true;
+        } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
+          LOG.debug("Exiting as no more containers are allocated or requested");
+          finalStatus = FinalApplicationStatus.SUCCEEDED;
+          appDone = true;
         }
-      }
-      if (dnmgr.forcedShutdown) {
-        LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
-        finalStatus = FinalApplicationStatus.FAILED;
-        appDone = true;
-      } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
-        LOG.debug("Exiting as no more containers are allocated or requested");
-        finalStatus = FinalApplicationStatus.SUCCEEDED;
-        appDone = true;
-      }
 
-      LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
-          loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
+        LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+            loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
 
-      // monitor child containers
-      dnmgr.monitorHeartbeat(waitForRecovery > 0);
+        // monitor child containers
+        dnmgr.monitorHeartbeat(waitForRecovery > 0);
 
-      waitForRecovery = Math.max(waitForRecovery - 1, 0);
-    }
+        waitForRecovery = Math.max(waitForRecovery - 1, 0);
+      }
 
-    finishApplication(finalStatus);
+      finishApplication(finalStatus);
+    }
   }
 
   private void finishApplication(FinalApplicationStatus finalStatus) throws YarnException, IOException
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 152e6f1..903dad2 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -169,7 +169,7 @@ public class ApexCli
   protected Configuration conf;
   private FileSystem fs;
   private StramAgent stramAgent;
-  private final YarnClient yarnClient = YarnClient.createYarnClient();
+  private YarnClient yarnClient = null;
   private ApplicationReport currentApp = null;
   private boolean consolePresent;
   private String[] commandsToExecute;
@@ -1208,8 +1208,7 @@ public class ApexCli
     fs = StramClientUtils.newFileSystemInstance(conf);
     stramAgent = new StramAgent(fs, conf);
 
-    yarnClient.init(conf);
-    yarnClient.start();
+    yarnClient = StramClientUtils.createYarnClient(conf);
     LOG.debug("Yarn Client initialized and started");
     String socks = conf.get(CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY);
     if (socks != null) {
@@ -2402,6 +2401,7 @@ public class ApexCli
           LOG.warn("Cannot flush command history");
         }
       }
+      yarnClient.stop();
       System.exit(0);
     }
 
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
index a1ac8ca..ce7507d 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
@@ -331,12 +331,8 @@ public class StramAgent extends FSAgent
 
   private StramWebServicesInfo retrieveWebServicesInfo(String appId)
   {
-    YarnClient yarnClient = YarnClient.createYarnClient();
     String url;
-    try {
-      yarnClient.init(conf);
-      yarnClient.start();
-
+    try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
       ApplicationReport ar = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
       if (ar == null) {
         LOG.warn("YARN does not have record for this application {}", appId);
@@ -364,8 +360,6 @@ public class StramAgent extends FSAgent
     } catch (Exception ex) {
       LOG.error("Cannot retrieve web services info", ex);
       return null;
-    } finally {
-      yarnClient.stop();
     }
 
     WebServicesClient webServicesClient = new WebServicesClient();
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d8caa1e..a310ee2 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -317,6 +317,14 @@ public class StramClientUtils
 
   }
 
+  public static YarnClient createYarnClient(Configuration conf)
+  {
+    YarnClient client = YarnClient.createYarnClient();
+    client.init(conf);
+    client.start();
+    return client;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);
 
   public static String getHostName()
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 83aa781..71eb825 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -110,22 +110,14 @@ public class StramUserLogin
         public Object run() throws Exception
         {
 
-          YarnClient yarnClient = null;
-          if (renewRMToken) {
-            yarnClient = YarnClient.createYarnClient();
-            yarnClient.init(conf);
-            yarnClient.start();
-          }
           Credentials creds = new Credentials();
           try (FileSystem fs1 = FileSystem.newInstance(conf)) {
             fs1.addDelegationTokens(tokenRenewer, creds);
-            if (renewRMToken) {
+          }
+          if (renewRMToken) {
+            try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
               new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
             }
-          } finally {
-            if (renewRMToken) {
-              yarnClient.stop();
-            }
           }
           credentials.addAll(creds);
 
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 3c49a71..9a69b08 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
 
 import org.apache.apex.api.YarnAppLauncher;
 import org.apache.apex.engine.util.StreamingAppFactory;
-import org.apache.bval.jsr303.util.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -37,6 +36,7 @@ import com.google.common.base.Throwables;
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 
@@ -80,7 +80,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
         }
       };
       ApplicationId appId = appLauncher.launchApp(appFactory);
-      return new YarnAppHandleImpl(appId);
+      return new YarnAppHandleImpl(appId, conf);
     } catch (Exception ex) {
       throw new LauncherException(ex);
     }
@@ -89,18 +89,15 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
   protected void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
   {
     if (shutdownMode == ShutdownMode.KILL) {
-      YarnClient yarnClient = YarnClient.createYarnClient();
       try {
         ApplicationId applicationId = app.appId;
-        ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+        ApplicationReport appReport = app.yarnClient.getApplicationReport(applicationId);
         if (appReport == null) {
           throw new LauncherException("Application " + app.getApplicationId() + " not found");
         }
-        yarnClient.killApplication(applicationId);
+        app.yarnClient.killApplication(applicationId);
       } catch (YarnException | IOException e) {
         throw Throwables.propagate(e);
-      } finally {
-        IOUtils.closeQuietly(yarnClient);
       }
     } else {
       throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
@@ -124,13 +121,15 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     }
   }
 
-  public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+  public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle, AutoCloseable
   {
     final ApplicationId appId;
+    private final YarnClient yarnClient;
 
-    public YarnAppHandleImpl(ApplicationId appId)
+    public YarnAppHandleImpl(ApplicationId appId, Configuration conf)
     {
       this.appId = appId;
+      this.yarnClient = StramClientUtils.createYarnClient(conf);
     }
 
     @Override
@@ -142,7 +141,6 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     @Override
     public boolean isFinished()
     {
-      YarnClient yarnClient = YarnClient.createYarnClient();
       try {
         ApplicationReport appReport = yarnClient.getApplicationReport(appId);
         if (appReport != null) {
@@ -154,8 +152,6 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
         return true;
       } catch (YarnException | IOException e) {
         throw Throwables.propagate(e);
-      } finally {
-        IOUtils.closeQuietly(yarnClient);
       }
     }
 
@@ -164,8 +160,16 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
         throws org.apache.apex.api.Launcher.LauncherException
     {
       shutdownApp(this, shutdownMode);
-
     }
 
+    @Override
+    public void close() throws Exception
+    {
+      // Calling close instead of stop on YarnClient as the current close method would typically have been called as
+      // part of closeable handling by the vm and it would be appropriate to continue this pattern by calling close on
+      // YarnClient. Effectively, this should be the same as calling stop as the documentation of YarnClient close calls
+      // for the close method to be a call to stop.
+      yarnClient.close();
+    }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].