You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/02 21:04:16 UTC

hadoop git commit: YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 1c09ca2ba -> 52575ff22


YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52575ff2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52575ff2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52575ff2

Branch: refs/heads/trunk
Commit: 52575ff2240ea0b7cea818ef34503576c0ecf9f7
Parents: 1c09ca2
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Feb 2 12:03:20 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Feb 2 12:03:52 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../yarn/client/api/impl/YarnClientImpl.java    | 79 +++++++++++++++++---
 .../yarn/client/api/impl/TestYarnClient.java    | 54 ++++++++++++-
 3 files changed, 123 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8a05b5c..bc20e88 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -225,6 +225,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li
     via jeagles)
 
+    YARN-2808. Made YARN CLI list attempt’s finished containers of a running
+    application. (Naganarasimha G R via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e4f31f2..99c0f02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -118,7 +120,7 @@ public class YarnClientImpl extends YarnClient {
   protected long submitPollIntervalMillis;
   private long asyncApiPollIntervalMillis;
   private long asyncApiPollTimeoutMillis;
-  private AHSClient historyClient;
+  protected AHSClient historyClient;
   private boolean historyServiceEnabled;
   protected TimelineClient timelineClient;
   @VisibleForTesting
@@ -647,24 +649,79 @@ public class YarnClientImpl extends YarnClient {
   public List<ContainerReport> getContainers(
       ApplicationAttemptId applicationAttemptId) throws YarnException,
       IOException {
+    List<ContainerReport> containersForAttempt =
+        new ArrayList<ContainerReport>();
+    boolean appNotFoundInRM = false;
     try {
-      GetContainersRequest request = Records
-          .newRecord(GetContainersRequest.class);
+      GetContainersRequest request =
+          Records.newRecord(GetContainersRequest.class);
       request.setApplicationAttemptId(applicationAttemptId);
       GetContainersResponse response = rmClient.getContainers(request);
-      return response.getContainerList();
+      containersForAttempt.addAll(response.getContainerList());
     } catch (YarnException e) {
-      if (!historyServiceEnabled) {
-        // Just throw it as usual if historyService is not enabled.
+      if (e.getClass() != ApplicationNotFoundException.class
+          || !historyServiceEnabled) {
+        // If Application is not in RM and history service is enabled then we
+        // need to check with history service else throw exception.
         throw e;
       }
-      // Even if history-service is enabled, treat all exceptions still the same
-      // except the following
-      if (e.getClass() != ApplicationNotFoundException.class) {
-        throw e;
+      appNotFoundInRM = true;
+    }
+
+    if (historyServiceEnabled) {
+      // Check with AHS even if found in RM because to capture info of finished
+      // containers also
+      List<ContainerReport> containersListFromAHS = null;
+      try {
+        containersListFromAHS =
+            historyClient.getContainers(applicationAttemptId);
+      } catch (IOException e) {
+        // History service access might be enabled but system metrics publisher
+        // is disabled hence app not found exception is possible
+        if (appNotFoundInRM) {
+          // app not found in bothM and RM then propagate the exception.
+          throw e;
+        }
+      }
+
+      if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
+        // remove duplicates
+
+        Set<ContainerId> containerIdsToBeKeptFromAHS =
+            new HashSet<ContainerId>();
+        Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
+        while (tmpItr.hasNext()) {
+          containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
+        }
+
+        Iterator<ContainerReport> rmContainers =
+            containersForAttempt.iterator();
+        while (rmContainers.hasNext()) {
+          ContainerReport tmp = rmContainers.next();
+          containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
+          // Remove containers from AHS as container from RM will have latest
+          // information
+        }
+
+        if (containerIdsToBeKeptFromAHS.size() > 0
+            && containersListFromAHS.size() != containerIdsToBeKeptFromAHS
+                .size()) {
+          Iterator<ContainerReport> containersFromHS =
+              containersListFromAHS.iterator();
+          while (containersFromHS.hasNext()) {
+            ContainerReport containerReport = containersFromHS.next();
+            if (containerIdsToBeKeptFromAHS.contains(containerReport
+                .getContainerId())) {
+              containersForAttempt.add(containerReport);
+            }
+          }
+        } else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
+            .size()) {
+          containersForAttempt.addAll(containersListFromAHS);
+        }
       }
-      return historyClient.getContainers(applicationAttemptId);
     }
+    return containersForAttempt;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52575ff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 02f2882..7859688 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AHSClient;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -338,6 +339,9 @@ public class TestYarnClient {
   @Test(timeout = 10000)
   public void testGetContainers() throws YarnException, IOException {
     Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+        true);
+    
     final YarnClient client = new MockYarnClient();
     client.init(conf);
     client.start();
@@ -351,6 +355,17 @@ public class TestYarnClient {
         (ContainerId.newContainerId(appAttemptId, 1)));
     Assert.assertEquals(reports.get(1).getContainerId(),
         (ContainerId.newContainerId(appAttemptId, 2)));
+    Assert.assertEquals(reports.get(2).getContainerId(),
+        (ContainerId.newContainerId(appAttemptId, 3)));
+    
+    //First2 containers should come from RM with updated state information and 
+    // 3rd container is not there in RM and should
+    Assert.assertEquals(ContainerState.RUNNING,
+        (reports.get(0).getContainerState()));
+    Assert.assertEquals(ContainerState.RUNNING,
+        (reports.get(1).getContainerState()));
+    Assert.assertEquals(ContainerState.COMPLETE,
+        (reports.get(2).getContainerState()));
     client.stop();
   }
 
@@ -383,6 +398,9 @@ public class TestYarnClient {
       new HashMap<ApplicationId, List<ApplicationAttemptReport>>();
     private HashMap<ApplicationAttemptId, List<ContainerReport>> containers = 
       new HashMap<ApplicationAttemptId, List<ContainerReport>>();
+    private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS =
+      new HashMap<ApplicationAttemptId, List<ContainerReport>>();
+
     GetApplicationsResponse mockAppResponse =
       mock(GetApplicationsResponse.class);
     GetApplicationAttemptsResponse mockAppAttemptsResponse = 
@@ -428,6 +446,9 @@ public class TestYarnClient {
 
         when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
             .thenReturn(mockContainerResponse);
+        
+        historyClient = mock(AHSClient.class);
+        
       } catch (YarnException e) {
         Assert.fail("Exception is not expected.");
       } catch (IOException e) {
@@ -501,15 +522,37 @@ public class TestYarnClient {
       ContainerReport container = ContainerReport.newInstance(
           ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
           NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
-          "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
+          "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
       containerReports.add(container);
 
       ContainerReport container1 = ContainerReport.newInstance(
           ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
           NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
-          "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
+          "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
       containerReports.add(container1);
       containers.put(attempt.getApplicationAttemptId(), containerReports);
+      
+      //add containers to be sent from AHS
+      List<ContainerReport> containerReportsForAHS =
+          new ArrayList<ContainerReport>();
+      
+      container = ContainerReport.newInstance(
+          ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
+          NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+          "diagnosticInfo", "logURL", 0, null);
+      containerReportsForAHS.add(container);
+
+      container1 = ContainerReport.newInstance(
+          ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
+          NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+          "diagnosticInfo", "HSlogURL", 0, null);
+      containerReportsForAHS.add(container1);
+      ContainerReport container2 = ContainerReport.newInstance(
+          ContainerId.newContainerId(attempt.getApplicationAttemptId(),3), null,
+          NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
+          "diagnosticInfo", "HSlogURL", 0, ContainerState.COMPLETE);
+      containerReportsForAHS.add(container2);
+      containersFromAHS.put(attempt.getApplicationAttemptId(), containerReportsForAHS);
 
       ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6);
       ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
@@ -586,9 +629,16 @@ public class TestYarnClient {
             IOException {
       when(mockContainersResponse.getContainerList()).thenReturn(
         getContainersReport(appAttemptId));
+      when(historyClient.getContainers(any(ApplicationAttemptId.class)))
+      .thenReturn(getContainersFromAHS(appAttemptId));
       return super.getContainers(appAttemptId);
     }
 
+    private List<ContainerReport> getContainersFromAHS(
+        ApplicationAttemptId appAttemptId) {
+      return containersFromAHS.get(appAttemptId);
+    }
+
     @Override
     public ContainerReport getContainerReport(ContainerId containerId)
         throws YarnException, IOException {