You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/07/19 05:21:42 UTC
[apex-core] branch master updated: APEXCORE-742 Using a common
utility method for creaton of yarn client instances and fixing scenarios
where the client is not being initialized correctly.
This is an automated email from the ASF dual-hosted git repository.
vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new 0413f9b APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.
0413f9b is described below
commit 0413f9b5bd19a2e5a46c565a7815f1e5c2f7a57f
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Mon Jun 5 17:24:03 2017 -0700
APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.
---
.../stram/StreamingAppMasterService.java | 539 ++++++++++-----------
.../java/com/datatorrent/stram/cli/ApexCli.java | 6 +-
.../com/datatorrent/stram/client/StramAgent.java | 8 +-
.../datatorrent/stram/client/StramClientUtils.java | 8 +
.../datatorrent/stram/security/StramUserLogin.java | 14 +-
.../apache/apex/engine/YarnAppLauncherImpl.java | 30 +-
6 files changed, 300 insertions(+), 305 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 5030a32..63080bb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -732,269 +732,265 @@ public class StreamingAppMasterService extends CompositeService
ResourceRequestHandler resourceRequestor = System.getenv().containsKey("CDH_HADOOP_BIN") ? new BlacklistBasedResourceRequestHandler() : new ResourceRequestHandler();
List<ContainerStartRequest> pendingContainerStartRequests = new LinkedList<>();
- YarnClient clientRMService = YarnClient.createYarnClient();
+ try (YarnClient clientRMService = StramClientUtils.createYarnClient(conf)) {
- try {
- // YARN-435
- // we need getClusterNodes to populate the initial node list,
- // subsequent updates come through the heartbeat response
- clientRMService.init(conf);
- clientRMService.start();
-
- ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID));
- if (ar != null) {
- appDone = true;
- dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
- ar.getApplicationId().toString(), ar.getName(), ar.getUser());
- LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
- finishApplication(FinalApplicationStatus.FAILED);
- return;
+ try {
+ // YARN-435
+ // we need getClusterNodes to populate the initial node list,
+ // subsequent updates come through the heartbeat response
+
+ ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID));
+ if (ar != null) {
+ appDone = true;
+ dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
+ ar.getApplicationId().toString(), ar.getName(), ar.getUser());
+ LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
+ finishApplication(FinalApplicationStatus.FAILED);
+ return;
+ }
+ resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
+ nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
}
- resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
- nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL;
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
- } finally {
- clientRMService.stop();
- }
- List<Container> containers = response.getContainersFromPreviousAttempts();
+ List<Container> containers = response.getContainersFromPreviousAttempts();
- // Running containers might take a while to register with the new app master and send the heartbeat signal.
- int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
+ // Running containers might take a while to register with the new app master and send the heartbeat signal.
+ int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
- List<ContainerId> releasedContainers = previouslyAllocatedContainers(containers);
- FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
- final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
+ List<ContainerId> releasedContainers = previouslyAllocatedContainers(containers);
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+ final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
- while (!appDone) {
- loopCounter++;
- final long currentTimeMillis = System.currentTimeMillis();
+ while (!appDone) {
+ loopCounter++;
+ final long currentTimeMillis = System.currentTimeMillis();
- if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
- String applicationId = appAttemptID.getApplicationId().toString();
- expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
- }
+ if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
+ String applicationId = appAttemptID.getApplicationId().toString();
+ expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
+ }
- if (currentTimeMillis > nodeReportUpdateTime) {
- resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
- nodeReportUpdateTime = currentTimeMillis + UPDATE_NODE_REPORTS_INTERVAL;
- }
+ if (currentTimeMillis > nodeReportUpdateTime) {
+ resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
+ nodeReportUpdateTime = currentTimeMillis + UPDATE_NODE_REPORTS_INTERVAL;
+ }
- Runnable r;
- while ((r = this.pendingTasks.poll()) != null) {
- r.run();
- }
+ Runnable r;
+ while ((r = this.pendingTasks.poll()) != null) {
+ r.run();
+ }
- // log current state
- /*
- * LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" +
- * numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers +
- * ", failed=" + numFailedContainers + ", currentAllocated=" + this.allAllocatedContainers.size());
- */
- // Sleep before each loop when asking RM for containers
- // to avoid flooding RM with spurious requests when it
- // need not have any available containers
- try {
- sleep(1000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted " + e.getMessage());
- }
+ // log current state
+ /*
+ * LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" +
+ * numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers +
+ * ", failed=" + numFailedContainers + ", currentAllocated=" + this.allAllocatedContainers.size());
+ */
+ // Sleep before each loop when asking RM for containers
+ // to avoid flooding RM with spurious requests when it
+ // need not have any available containers
+ try {
+ sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted", e);
+ }
- // Setup request to be sent to RM to allocate containers
- List<ContainerRequest> containerRequests = new ArrayList<>();
- List<ContainerRequest> removedContainerRequests = new ArrayList<>();
-
- // request containers for pending deploy requests
- if (!dnmgr.containerStartRequests.isEmpty()) {
- StreamingContainerAgent.ContainerStartRequest csr;
- while ((csr = dnmgr.containerStartRequests.poll()) != null) {
- if (csr.container.getRequiredMemoryMB() > maxMem) {
- LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
- csr.container.setRequiredMemoryMB(maxMem);
- }
- if (csr.container.getRequiredMemoryMB() < minMem) {
- csr.container.setRequiredMemoryMB(minMem);
- }
- if (csr.container.getRequiredVCores() > maxVcores) {
- LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", csr.container.getRequiredVCores(), maxVcores);
- csr.container.setRequiredVCores(maxVcores);
- }
- if (csr.container.getRequiredVCores() < minVcores) {
- csr.container.setRequiredVCores(minVcores);
- }
- csr.container.setResourceRequestPriority(nextRequestPriority++);
- ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
- if (cr == null) {
- pendingContainerStartRequests.add(csr);
- } else {
- resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+ // Setup request to be sent to RM to allocate containers
+ List<ContainerRequest> containerRequests = new ArrayList<>();
+ List<ContainerRequest> removedContainerRequests = new ArrayList<>();
+
+ // request containers for pending deploy requests
+ if (!dnmgr.containerStartRequests.isEmpty()) {
+ StreamingContainerAgent.ContainerStartRequest csr;
+ while ((csr = dnmgr.containerStartRequests.poll()) != null) {
+ if (csr.container.getRequiredMemoryMB() > maxMem) {
+ LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
+ csr.container.setRequiredMemoryMB(maxMem);
+ }
+ if (csr.container.getRequiredMemoryMB() < minMem) {
+ csr.container.setRequiredMemoryMB(minMem);
+ }
+ if (csr.container.getRequiredVCores() > maxVcores) {
+ LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", csr.container.getRequiredVCores(), maxVcores);
+ csr.container.setRequiredVCores(maxVcores);
+ }
+ if (csr.container.getRequiredVCores() < minVcores) {
+ csr.container.setRequiredVCores(minVcores);
+ }
+ csr.container.setResourceRequestPriority(nextRequestPriority++);
+ ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
+ if (cr == null) {
+ pendingContainerStartRequests.add(csr);
+ } else {
+ resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+ }
}
}
- }
- // If all other requests are allocated, retry pending requests which need host availability
- if (containerRequests.isEmpty() && !pendingContainerStartRequests.isEmpty()) {
- List<ContainerStartRequest> removalList = new LinkedList<>();
- for (ContainerStartRequest csr : pendingContainerStartRequests) {
- ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
- if (cr != null) {
- resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
- removalList.add(csr);
+ // If all other requests are allocated, retry pending requests which need host availability
+ if (containerRequests.isEmpty() && !pendingContainerStartRequests.isEmpty()) {
+ List<ContainerStartRequest> removalList = new LinkedList<>();
+ for (ContainerStartRequest csr : pendingContainerStartRequests) {
+ ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
+ if (cr != null) {
+ resourceRequestor.addContainerRequest(requestedResources, loopCounter, containerRequests, csr, cr);
+ removalList.add(csr);
+ }
}
+ pendingContainerStartRequests.removeAll(removalList);
}
- pendingContainerStartRequests.removeAll(removalList);
- }
- resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
+ resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
/* Remove nodes from blacklist after timeout */
- List<String> blacklistRemovals = new ArrayList<>();
- for (String hostname : failedBlackListedNodes) {
- Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
- if (timeDiff >= blacklistRemovalTime) {
- blacklistRemovals.add(hostname);
- failedContainerNodesMap.remove(hostname);
+ List<String> blacklistRemovals = new ArrayList<>();
+ for (String hostname : failedBlackListedNodes) {
+ Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
+ if (timeDiff >= blacklistRemovalTime) {
+ blacklistRemovals.add(hostname);
+ failedContainerNodesMap.remove(hostname);
+ }
+ }
+ if (!blacklistRemovals.isEmpty()) {
+ amRmClient.updateBlacklist(null, blacklistRemovals);
+ LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString());
+ failedBlackListedNodes.removeAll(blacklistRemovals);
}
- }
- if (!blacklistRemovals.isEmpty()) {
- amRmClient.updateBlacklist(null, blacklistRemovals);
- LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString());
- failedBlackListedNodes.removeAll(blacklistRemovals);
- }
- numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
- AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
- if (amResp.getAMCommand() != null) {
- LOG.info(" statement executed:{}", amResp.getAMCommand());
- switch (amResp.getAMCommand()) {
- case AM_RESYNC:
- case AM_SHUTDOWN:
- throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
- default:
- throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
+ numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
+ AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
+ if (amResp.getAMCommand() != null) {
+ LOG.info(" statement executed:{}", amResp.getAMCommand());
+ switch (amResp.getAMCommand()) {
+ case AM_RESYNC:
+ case AM_SHUTDOWN:
+ throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
+ default:
+ throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
- }
- }
- releasedContainers.clear();
-
- // Retrieve list of allocated containers from the response
- List<Container> newAllocatedContainers = amResp.getAllocatedContainers();
- // LOG.info("Got response from RM for container ask, allocatedCnt=" + newAllocatedContainers.size());
- numRequestedContainers -= newAllocatedContainers.size();
- long timestamp = System.currentTimeMillis();
- for (Container allocatedContainer : newAllocatedContainers) {
-
- LOG.info("Got new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
- // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
-
- boolean alreadyAllocated = true;
- StreamingContainerAgent.ContainerStartRequest csr = null;
- for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
- if (entry.getKey().container.getResourceRequestPriority() == allocatedContainer.getPriority().getPriority()) {
- alreadyAllocated = false;
- csr = entry.getKey();
- break;
}
}
+ releasedContainers.clear();
+
+ // Retrieve list of allocated containers from the response
+ List<Container> newAllocatedContainers = amResp.getAllocatedContainers();
+ // LOG.info("Got response from RM for container ask, allocatedCnt=" + newAllocatedContainers.size());
+ numRequestedContainers -= newAllocatedContainers.size();
+ long timestamp = System.currentTimeMillis();
+ for (Container allocatedContainer : newAllocatedContainers) {
+
+ LOG.info("Got new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
+ // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ boolean alreadyAllocated = true;
+ StreamingContainerAgent.ContainerStartRequest csr = null;
+ for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
+ if (entry.getKey().container.getResourceRequestPriority() == allocatedContainer.getPriority().getPriority()) {
+ alreadyAllocated = false;
+ csr = entry.getKey();
+ break;
+ }
+ }
- if (alreadyAllocated) {
- LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
- releasedContainers.add(allocatedContainer.getId());
- numReleasedContainers++;
- numRequestedContainers--;
- continue;
- }
- if (csr != null) {
- requestedResources.remove(csr);
- }
+ if (alreadyAllocated) {
+ LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
+ releasedContainers.add(allocatedContainer.getId());
+ numReleasedContainers++;
+ numRequestedContainers--;
+ continue;
+ }
+ if (csr != null) {
+ requestedResources.remove(csr);
+ }
- // allocate resource to container
- ContainerResource resource = new ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress());
- StreamingContainerAgent sca = dnmgr.assignContainer(resource, null);
+ // allocate resource to container
+ ContainerResource resource = new ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress());
+ StreamingContainerAgent sca = dnmgr.assignContainer(resource, null);
- if (sca == null) {
- // allocated container no longer needed, add release request
- LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
- releasedContainers.add(allocatedContainer.getId());
- } else {
- AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
- this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
- ByteBuffer tokens = null;
- if (UserGroupInformation.isSecurityEnabled()) {
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- Token<StramDelegationTokenIdentifier> delegationToken = allocateDelegationToken(ugi.getUserName(), heartbeatListener.getAddress());
- allocatedContainerHolder.delegationToken = delegationToken;
- //ByteBuffer tokens = LaunchContainerRunnable.getTokens(delegationTokenManager, heartbeatListener.getAddress());
- tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
+ if (sca == null) {
+ // allocated container no longer needed, add release request
+ LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
+ releasedContainers.add(allocatedContainer.getId());
+ } else {
+ AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
+ this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
+ ByteBuffer tokens = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ Token<StramDelegationTokenIdentifier> delegationToken = allocateDelegationToken(ugi.getUserName(), heartbeatListener.getAddress());
+ allocatedContainerHolder.delegationToken = delegationToken;
+ //ByteBuffer tokens = LaunchContainerRunnable.getTokens(delegationTokenManager, heartbeatListener.getAddress());
+ tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
+ }
+ LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, nmClient, sca, tokens);
+ // Thread launchThread = new Thread(runnableLaunchContainer);
+ // launchThreads.add(launchThread);
+ // launchThread.start();
+ launchContainer.run(); // communication with NMs is now async
+
+ // record container start event
+ StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
+ allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
+ ev.setTimestamp(timestamp);
+ dnmgr.recordEventAsync(ev);
}
- LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, nmClient, sca, tokens);
- // Thread launchThread = new Thread(runnableLaunchContainer);
- // launchThreads.add(launchThread);
- // launchThread.start();
- launchContainer.run(); // communication with NMs is now async
-
- // record container start event
- StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(),
- allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString()));
- ev.setTimestamp(timestamp);
- dnmgr.recordEventAsync(ev);
}
- }
- // track node updates for future locality constraint allocations
- // TODO: it seems 2.0.4-alpha doesn't give us any updates
- resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
+ // track node updates for future locality constraint allocations
+ // TODO: it seems 2.0.4-alpha doesn't give us any updates
+ resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
- // Check the completed containers
- List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
- // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
- List<String> blacklistAdditions = new ArrayList<>();
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
+ // Check the completed containers
+ List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
+ // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ List<String> blacklistAdditions = new ArrayList<>();
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
- // non complete containers should not be here
- assert (containerStatus.getState() == ContainerState.COMPLETE);
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
- AllocatedContainer allocatedContainer = allocatedContainers.remove(containerStatus.getContainerId().toString());
- if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
- }
- EventGroupId groupId = null;
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- if (allocatedContainer != null) {
- numFailedContainers.incrementAndGet();
- if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) {
- // If container failure due to framework
- String hostname = allocatedContainer.container.getNodeId().getHost();
- if (!failedBlackListedNodes.contains(hostname)) {
- // Blacklist the node if not already blacklisted
- if (failedContainerNodesMap.containsKey(hostname)) {
- NodeFailureStats stats = failedContainerNodesMap.get(hostname);
- long timeStamp = System.currentTimeMillis();
- if (timeStamp - stats.lastFailureTimeStamp >= blacklistRemovalTime) {
- // Reset failure count if last failure was before Blacklist removal time
- stats.failureCount = 1;
- stats.lastFailureTimeStamp = timeStamp;
- } else {
- stats.lastFailureTimeStamp = timeStamp;
- stats.failureCount++;
- if (stats.failureCount >= maxConsecutiveContainerFailures) {
- LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000));
- blacklistAdditions.add(hostname);
- failedBlackListedNodes.add(hostname);
+ AllocatedContainer allocatedContainer = allocatedContainers.remove(containerStatus.getContainerId().toString());
+ if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
+ }
+ EventGroupId groupId = null;
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ if (allocatedContainer != null) {
+ numFailedContainers.incrementAndGet();
+ if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) {
+ // If container failure due to framework
+ String hostname = allocatedContainer.container.getNodeId().getHost();
+ if (!failedBlackListedNodes.contains(hostname)) {
+ // Blacklist the node if not already blacklisted
+ if (failedContainerNodesMap.containsKey(hostname)) {
+ NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+ long timeStamp = System.currentTimeMillis();
+ if (timeStamp - stats.lastFailureTimeStamp >= blacklistRemovalTime) {
+ // Reset failure count if last failure was before Blacklist removal time
+ stats.failureCount = 1;
+ stats.lastFailureTimeStamp = timeStamp;
+ } else {
+ stats.lastFailureTimeStamp = timeStamp;
+ stats.failureCount++;
+ if (stats.failureCount >= maxConsecutiveContainerFailures) {
+ LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000));
+ blacklistAdditions.add(hostname);
+ failedBlackListedNodes.add(hostname);
+ }
}
+ } else {
+ failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1));
}
- } else {
- failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1));
}
}
}
- }
// if (exitStatus == 1) {
// // non-recoverable StreamingContainer failure
// appDone = true;
@@ -1003,62 +999,63 @@ public class StreamingAppMasterService extends CompositeService
// LOG.info("Exiting due to: {}", dnmgr.shutdownDiagnosticsMessage);
// }
// else {
- // Recoverable failure or process killed (externally or via stop request by AM)
- // also occurs when a container was released by the application but never assigned/launched
- LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
- String containerIdStr = containerStatus.getContainerId().toString();
- dnmgr.scheduleContainerRestart(containerIdStr);
- groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
+ // Recoverable failure or process killed (externally or via stop request by AM)
+ // also occurs when a container was released by the application but never assigned/launched
+ LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
+ String containerIdStr = containerStatus.getContainerId().toString();
+ dnmgr.scheduleContainerRestart(containerIdStr);
+ groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr);
// }
- } else {
- // container completed successfully
- numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
- // Reset counter for node failure, if exists
- String hostname = allocatedContainer.container.getNodeId().getHost();
- NodeFailureStats stats = failedContainerNodesMap.get(hostname);
- if (stats != null) {
- stats.failureCount = 0;
+ } else {
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+ // Reset counter for node failure, if exists
+ String hostname = allocatedContainer.container.getNodeId().getHost();
+ NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+ if (stats != null) {
+ stats.failureCount = 0;
+ }
}
- }
- String containerIdStr = containerStatus.getContainerId().toString();
- dnmgr.removeContainerAgent(containerIdStr);
+ String containerIdStr = containerStatus.getContainerId().toString();
+ dnmgr.removeContainerAgent(containerIdStr);
- // record container stop event
- StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId);
- ev.setReason(containerStatus.getDiagnostics());
- dnmgr.recordEventAsync(ev);
- }
+ // record container stop event
+ StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId);
+ ev.setReason(containerStatus.getDiagnostics());
+ dnmgr.recordEventAsync(ev);
+ }
- if (!blacklistAdditions.isEmpty()) {
- amRmClient.updateBlacklist(blacklistAdditions, null);
- long timeStamp = System.currentTimeMillis();
- for (String hostname : blacklistAdditions) {
- NodeFailureStats stats = failedContainerNodesMap.get(hostname);
- stats.blackListAdditionTime = timeStamp;
+ if (!blacklistAdditions.isEmpty()) {
+ amRmClient.updateBlacklist(blacklistAdditions, null);
+ long timeStamp = System.currentTimeMillis();
+ for (String hostname : blacklistAdditions) {
+ NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+ stats.blackListAdditionTime = timeStamp;
+ }
+ }
+ if (dnmgr.forcedShutdown) {
+ LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
+ finalStatus = FinalApplicationStatus.FAILED;
+ appDone = true;
+ } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
+ LOG.debug("Exiting as no more containers are allocated or requested");
+ finalStatus = FinalApplicationStatus.SUCCEEDED;
+ appDone = true;
}
- }
- if (dnmgr.forcedShutdown) {
- LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
- finalStatus = FinalApplicationStatus.FAILED;
- appDone = true;
- } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
- LOG.debug("Exiting as no more containers are allocated or requested");
- finalStatus = FinalApplicationStatus.SUCCEEDED;
- appDone = true;
- }
- LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
- loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
+ LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+ loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
- // monitor child containers
- dnmgr.monitorHeartbeat(waitForRecovery > 0);
+ // monitor child containers
+ dnmgr.monitorHeartbeat(waitForRecovery > 0);
- waitForRecovery = Math.max(waitForRecovery - 1, 0);
- }
+ waitForRecovery = Math.max(waitForRecovery - 1, 0);
+ }
- finishApplication(finalStatus);
+ finishApplication(finalStatus);
+ }
}
private void finishApplication(FinalApplicationStatus finalStatus) throws YarnException, IOException
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 152e6f1..903dad2 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -169,7 +169,7 @@ public class ApexCli
protected Configuration conf;
private FileSystem fs;
private StramAgent stramAgent;
- private final YarnClient yarnClient = YarnClient.createYarnClient();
+ private YarnClient yarnClient = null;
private ApplicationReport currentApp = null;
private boolean consolePresent;
private String[] commandsToExecute;
@@ -1208,8 +1208,7 @@ public class ApexCli
fs = StramClientUtils.newFileSystemInstance(conf);
stramAgent = new StramAgent(fs, conf);
- yarnClient.init(conf);
- yarnClient.start();
+ yarnClient = StramClientUtils.createYarnClient(conf);
LOG.debug("Yarn Client initialized and started");
String socks = conf.get(CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY);
if (socks != null) {
@@ -2402,6 +2401,7 @@ public class ApexCli
LOG.warn("Cannot flush command history");
}
}
+ yarnClient.stop();
System.exit(0);
}
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
index a1ac8ca..ce7507d 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java
@@ -331,12 +331,8 @@ public class StramAgent extends FSAgent
private StramWebServicesInfo retrieveWebServicesInfo(String appId)
{
- YarnClient yarnClient = YarnClient.createYarnClient();
String url;
- try {
- yarnClient.init(conf);
- yarnClient.start();
-
+ try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
ApplicationReport ar = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
if (ar == null) {
LOG.warn("YARN does not have record for this application {}", appId);
@@ -364,8 +360,6 @@ public class StramAgent extends FSAgent
} catch (Exception ex) {
LOG.error("Cannot retrieve web services info", ex);
return null;
- } finally {
- yarnClient.stop();
}
WebServicesClient webServicesClient = new WebServicesClient();
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d8caa1e..a310ee2 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -317,6 +317,14 @@ public class StramClientUtils
}
+ public static YarnClient createYarnClient(Configuration conf)
+ {
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);
public static String getHostName()
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 83aa781..71eb825 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -110,22 +110,14 @@ public class StramUserLogin
public Object run() throws Exception
{
- YarnClient yarnClient = null;
- if (renewRMToken) {
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
- }
Credentials creds = new Credentials();
try (FileSystem fs1 = FileSystem.newInstance(conf)) {
fs1.addDelegationTokens(tokenRenewer, creds);
- if (renewRMToken) {
+ }
+ if (renewRMToken) {
+ try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
}
- } finally {
- if (renewRMToken) {
- yarnClient.stop();
- }
}
credentials.addAll(creds);
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 3c49a71..9a69b08 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.apex.api.YarnAppLauncher;
import org.apache.apex.engine.util.StreamingAppFactory;
-import org.apache.bval.jsr303.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -37,6 +36,7 @@ import com.google.common.base.Throwables;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -80,7 +80,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
}
};
ApplicationId appId = appLauncher.launchApp(appFactory);
- return new YarnAppHandleImpl(appId);
+ return new YarnAppHandleImpl(appId, conf);
} catch (Exception ex) {
throw new LauncherException(ex);
}
@@ -89,18 +89,15 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
protected void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
{
if (shutdownMode == ShutdownMode.KILL) {
- YarnClient yarnClient = YarnClient.createYarnClient();
try {
ApplicationId applicationId = app.appId;
- ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+ ApplicationReport appReport = app.yarnClient.getApplicationReport(applicationId);
if (appReport == null) {
throw new LauncherException("Application " + app.getApplicationId() + " not found");
}
- yarnClient.killApplication(applicationId);
+ app.yarnClient.killApplication(applicationId);
} catch (YarnException | IOException e) {
throw Throwables.propagate(e);
- } finally {
- IOUtils.closeQuietly(yarnClient);
}
} else {
throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
@@ -124,13 +121,15 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
}
}
- public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+ public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle, AutoCloseable
{
final ApplicationId appId;
+ private final YarnClient yarnClient;
- public YarnAppHandleImpl(ApplicationId appId)
+ public YarnAppHandleImpl(ApplicationId appId, Configuration conf)
{
this.appId = appId;
+ this.yarnClient = StramClientUtils.createYarnClient(conf);
}
@Override
@@ -142,7 +141,6 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
@Override
public boolean isFinished()
{
- YarnClient yarnClient = YarnClient.createYarnClient();
try {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport != null) {
@@ -154,8 +152,6 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
return true;
} catch (YarnException | IOException e) {
throw Throwables.propagate(e);
- } finally {
- IOUtils.closeQuietly(yarnClient);
}
}
@@ -164,8 +160,16 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
throws org.apache.apex.api.Launcher.LauncherException
{
shutdownApp(this, shutdownMode);
-
}
+ @Override
+ public void close() throws Exception
+ {
+ // Calling close instead of stop on YarnClient as the current close method would typically have been called as
+ // part of closeable handling by the vm and it would be appropriate to continue this pattern by calling close on
+ // YarnClient. Effectively, this should be the same as calling stop as the documentation of YarnClient close calls
+ // for the close method to be a call to stop.
+ yarnClient.close();
+ }
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].