You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/02 21:04:16 UTC
hadoop git commit: YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.
Repository: hadoop
Updated Branches:
refs/heads/trunk 1c09ca2ba -> 52575ff22
YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52575ff2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52575ff2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52575ff2
Branch: refs/heads/trunk
Commit: 52575ff2240ea0b7cea818ef34503576c0ecf9f7
Parents: 1c09ca2
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Feb 2 12:03:20 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Feb 2 12:03:52 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/client/api/impl/YarnClientImpl.java | 79 +++++++++++++++++---
.../yarn/client/api/impl/TestYarnClient.java | 54 ++++++++++++-
3 files changed, 123 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8a05b5c..bc20e88 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -225,6 +225,9 @@ Release 2.7.0 - UNRELEASED
YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li
via jeagles)
+ YARN-2808. Made YARN CLI list attempt’s finished containers of a running
+ application. (Naganarasimha G R via zjshen)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e4f31f2..99c0f02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -118,7 +120,7 @@ public class YarnClientImpl extends YarnClient {
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis;
- private AHSClient historyClient;
+ protected AHSClient historyClient;
private boolean historyServiceEnabled;
protected TimelineClient timelineClient;
@VisibleForTesting
@@ -647,24 +649,79 @@ public class YarnClientImpl extends YarnClient {
public List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException {
+ List<ContainerReport> containersForAttempt =
+ new ArrayList<ContainerReport>();
+ boolean appNotFoundInRM = false;
try {
- GetContainersRequest request = Records
- .newRecord(GetContainersRequest.class);
+ GetContainersRequest request =
+ Records.newRecord(GetContainersRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
GetContainersResponse response = rmClient.getContainers(request);
- return response.getContainerList();
+ containersForAttempt.addAll(response.getContainerList());
} catch (YarnException e) {
- if (!historyServiceEnabled) {
- // Just throw it as usual if historyService is not enabled.
+ if (e.getClass() != ApplicationNotFoundException.class
+ || !historyServiceEnabled) {
+ // If Application is not in RM and history service is enabled then we
+ // need to check with history service else throw exception.
throw e;
}
- // Even if history-service is enabled, treat all exceptions still the same
- // except the following
- if (e.getClass() != ApplicationNotFoundException.class) {
- throw e;
+ appNotFoundInRM = true;
+ }
+
+ if (historyServiceEnabled) {
+ // Check with AHS even if found in RM because to capture info of finished
+ // containers also
+ List<ContainerReport> containersListFromAHS = null;
+ try {
+ containersListFromAHS =
+ historyClient.getContainers(applicationAttemptId);
+ } catch (IOException e) {
+ // History service access might be enabled but system metrics publisher
+ // is disabled hence app not found exception is possible
+ if (appNotFoundInRM) {
+ // app not found in bothM and RM then propagate the exception.
+ throw e;
+ }
+ }
+
+ if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
+ // remove duplicates
+
+ Set<ContainerId> containerIdsToBeKeptFromAHS =
+ new HashSet<ContainerId>();
+ Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
+ while (tmpItr.hasNext()) {
+ containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
+ }
+
+ Iterator<ContainerReport> rmContainers =
+ containersForAttempt.iterator();
+ while (rmContainers.hasNext()) {
+ ContainerReport tmp = rmContainers.next();
+ containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
+ // Remove containers from AHS as container from RM will have latest
+ // information
+ }
+
+ if (containerIdsToBeKeptFromAHS.size() > 0
+ && containersListFromAHS.size() != containerIdsToBeKeptFromAHS
+ .size()) {
+ Iterator<ContainerReport> containersFromHS =
+ containersListFromAHS.iterator();
+ while (containersFromHS.hasNext()) {
+ ContainerReport containerReport = containersFromHS.next();
+ if (containerIdsToBeKeptFromAHS.contains(containerReport
+ .getContainerId())) {
+ containersForAttempt.add(containerReport);
+ }
+ }
+ } else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
+ .size()) {
+ containersForAttempt.addAll(containersListFromAHS);
+ }
}
- return historyClient.getContainers(applicationAttemptId);
}
+ return containersForAttempt;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 02f2882..7859688 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -338,6 +339,9 @@ public class TestYarnClient {
@Test(timeout = 10000)
public void testGetContainers() throws YarnException, IOException {
Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+ true);
+
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
@@ -351,6 +355,17 @@ public class TestYarnClient {
(ContainerId.newContainerId(appAttemptId, 1)));
Assert.assertEquals(reports.get(1).getContainerId(),
(ContainerId.newContainerId(appAttemptId, 2)));
+ Assert.assertEquals(reports.get(2).getContainerId(),
+ (ContainerId.newContainerId(appAttemptId, 3)));
+
+ //First2 containers should come from RM with updated state information and
+ // 3rd container is not there in RM and should
+ Assert.assertEquals(ContainerState.RUNNING,
+ (reports.get(0).getContainerState()));
+ Assert.assertEquals(ContainerState.RUNNING,
+ (reports.get(1).getContainerState()));
+ Assert.assertEquals(ContainerState.COMPLETE,
+ (reports.get(2).getContainerState()));
client.stop();
}
@@ -383,6 +398,9 @@ public class TestYarnClient {
new HashMap<ApplicationId, List<ApplicationAttemptReport>>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containers =
new HashMap<ApplicationAttemptId, List<ContainerReport>>();
+ private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS =
+ new HashMap<ApplicationAttemptId, List<ContainerReport>>();
+
GetApplicationsResponse mockAppResponse =
mock(GetApplicationsResponse.class);
GetApplicationAttemptsResponse mockAppAttemptsResponse =
@@ -428,6 +446,9 @@ public class TestYarnClient {
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
.thenReturn(mockContainerResponse);
+
+ historyClient = mock(AHSClient.class);
+
} catch (YarnException e) {
Assert.fail("Exception is not expected.");
} catch (IOException e) {
@@ -501,15 +522,37 @@ public class TestYarnClient {
ContainerReport container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
- "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
+ "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container);
ContainerReport container1 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
- "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
+ "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container1);
containers.put(attempt.getApplicationAttemptId(), containerReports);
+
+ //add containers to be sent from AHS
+ List<ContainerReport> containerReportsForAHS =
+ new ArrayList<ContainerReport>();
+
+ container = ContainerReport.newInstance(
+ ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
+ NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+ "diagnosticInfo", "logURL", 0, null);
+ containerReportsForAHS.add(container);
+
+ container1 = ContainerReport.newInstance(
+ ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
+ NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+ "diagnosticInfo", "HSlogURL", 0, null);
+ containerReportsForAHS.add(container1);
+ ContainerReport container2 = ContainerReport.newInstance(
+ ContainerId.newContainerId(attempt.getApplicationAttemptId(),3), null,
+ NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+ "diagnosticInfo", "HSlogURL", 0, ContainerState.COMPLETE);
+ containerReportsForAHS.add(container2);
+ containersFromAHS.put(attempt.getApplicationAttemptId(), containerReportsForAHS);
ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6);
ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
@@ -586,9 +629,16 @@ public class TestYarnClient {
IOException {
when(mockContainersResponse.getContainerList()).thenReturn(
getContainersReport(appAttemptId));
+ when(historyClient.getContainers(any(ApplicationAttemptId.class)))
+ .thenReturn(getContainersFromAHS(appAttemptId));
return super.getContainers(appAttemptId);
}
+ private List<ContainerReport> getContainersFromAHS(
+ ApplicationAttemptId appAttemptId) {
+ return containersFromAHS.get(appAttemptId);
+ }
+
@Override
public ContainerReport getContainerReport(ContainerId containerId)
throws YarnException, IOException {