You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/27 23:58:25 UTC

[14/37] hadoop git commit: YARN-3967. Fetch the application report from the AHS if the RM does not know about it. Contributed by Mit Desai

YARN-3967. Fetch the application report from the AHS if the RM does not
know about it. Contributed by Mit Desai


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fbd60632
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fbd60632
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fbd60632

Branch: refs/heads/HDFS-7240
Commit: fbd6063269221ec25834684477f434e19f0b66af
Parents: ee233ec
Author: Xuan <xg...@apache.org>
Authored: Fri Jul 24 10:15:54 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Fri Jul 24 10:15:54 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/server/webproxy/AppReportFetcher.java  |  79 +++++++++++--
 .../server/webproxy/TestAppReportFetcher.java   | 117 +++++++++++++++++++
 3 files changed, 187 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8bc9e4c..a25387d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -683,6 +683,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula
     via ozawa)
 
+    YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
+    (Mit Desai via xgong)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
index 5c93413..6aa43eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
@@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.AHSProxy;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -41,38 +45,73 @@ public class AppReportFetcher {
   private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
   private final Configuration conf;
   private final ApplicationClientProtocol applicationsManager;
+  private final ApplicationHistoryProtocol historyManager;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
+  private boolean isAHSEnabled;
+
   /**
-   * Create a new Connection to the RM to fetch Application reports.
+   * Create a new Connection to the RM/Application History Server
+   * to fetch Application reports.
    * @param conf the conf to use to know where the RM is.
    */
   public AppReportFetcher(Configuration conf) {
+    if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+        YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
+      isAHSEnabled = true;
+    }
     this.conf = conf;
     try {
       applicationsManager = ClientRMProxy.createRMProxy(conf,
           ApplicationClientProtocol.class);
+      if (isAHSEnabled) {
+        historyManager = getAHSProxy(conf);
+      } else {
+        this.historyManager = null;
+      }
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }
   }
   
   /**
-   * Just call directly into the applicationsManager given instead of creating
-   * a remote connection to it.  This is mostly for when the Proxy is running
-   * as part of the RM already.
+   * Create a direct connection to RM instead of a remote connection when
+   * the proxy is running as part of the RM. Also create a remote connection to
+   * Application History Server if it is enabled.
    * @param conf the configuration to use
    * @param applicationsManager what to use to get the RM reports.
    */
   public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
+    if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+        YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
+      isAHSEnabled = true;
+    }
     this.conf = conf;
     this.applicationsManager = applicationsManager;
+    if (isAHSEnabled) {
+      try {
+        historyManager = getAHSProxy(conf);
+      } catch (IOException e) {
+        throw new YarnRuntimeException(e);
+      }
+    } else {
+      this.historyManager = null;
+    }
   }
-  
+
+  protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
+      throws IOException {
+    return AHSProxy.createAHSProxy(configuration,
+      ApplicationHistoryProtocol.class,
+      configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
+  }
+
   /**
-   * Get a report for the specified app.
-   * @param appId the id of the application to get. 
-   * @return the ApplicationReport for that app.
+   * Get an application report for the specified application id from the RM and
+   * fall back to the Application History Server if not found in RM.
+   * @param appId id of the application to get.
+   * @return the ApplicationReport for the appId.
    * @throws YarnException on any error.
    * @throws IOException
    */
@@ -81,9 +120,22 @@ public class AppReportFetcher {
     GetApplicationReportRequest request = recordFactory
         .newRecordInstance(GetApplicationReportRequest.class);
     request.setApplicationId(appId);
-    
-    GetApplicationReportResponse response = applicationsManager
-        .getApplicationReport(request);
+
+    GetApplicationReportResponse response;
+    try {
+      response = applicationsManager.getApplicationReport(request);
+    } catch (YarnException e) {
+      if (!isAHSEnabled) {
+        // Just throw it as usual if historyService is not enabled.
+        throw e;
+      }
+      // Even if history-service is enabled, treat all exceptions still the same
+      // except the following
+      if (!(e.getClass() == ApplicationNotFoundException.class)) {
+        throw e;
+      }
+      response = historyManager.getApplicationReport(request);
+    }
     return response.getApplicationReport();
   }
 
@@ -91,5 +143,8 @@ public class AppReportFetcher {
     if (this.applicationsManager != null) {
       RPC.stopProxy(this.applicationsManager);
     }
+    if (this.historyManager != null) {
+      RPC.stopProxy(this.historyManager);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
new file mode 100644
index 0000000..bcab33f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestAppReportFetcher {
+
+  static ApplicationHistoryProtocol historyManager;
+  static Configuration conf = new Configuration();
+  private static ApplicationClientProtocol appManager;
+  private static AppReportFetcher fetcher;
+  private final String appNotFoundExceptionMsg = "APP NOT FOUND";
+
+  @After
+  public void cleanUp() {
+    historyManager = null;
+    appManager = null;
+    fetcher = null;
+  }
+
+  public void testHelper(boolean isAHSEnabled)
+      throws YarnException, IOException {
+    conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+        isAHSEnabled);
+    appManager = Mockito.mock(ApplicationClientProtocol.class);
+    Mockito.when(appManager
+        .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
+        .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
+    fetcher = new AppReportFetcherForTest(conf, appManager);
+    ApplicationId appId = ApplicationId.newInstance(0,0);
+    fetcher.getApplicationReport(appId);
+  }
+
+  @Test
+  public void testFetchReportAHSEnabled() throws YarnException, IOException {
+    testHelper(true);
+    Mockito.verify(historyManager, Mockito.times(1))
+    .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+    Mockito.verify(appManager, Mockito.times(1))
+    .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+  }
+
+  @Test
+  public void testFetchReportAHSDisabled() throws YarnException, IOException {
+    try {
+      testHelper(false);
+    } catch (ApplicationNotFoundException e) {
+      Assert.assertTrue(e.getMessage() == appNotFoundExceptionMsg);
+      /* RM will not know of the app and Application History Service is disabled
+       * So we will not try to get the report from AHS and RM will throw
+       * ApplicationNotFoundException
+       */
+    }
+    Mockito.verify(appManager, Mockito.times(1))
+    .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+    if (historyManager != null) {
+      Assert.fail("HistoryManager should be null as AHS is disabled");
+    }
+  }
+
+  static class AppReportFetcherForTest extends AppReportFetcher {
+
+    public AppReportFetcherForTest(Configuration conf,
+        ApplicationClientProtocol acp) {
+      super(conf, acp);
+    }
+
+    @Override
+    protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
+        throws IOException
+    {
+      GetApplicationReportResponse resp = Mockito.
+          mock(GetApplicationReportResponse.class);
+      historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
+      try {
+        Mockito.when(historyManager.getApplicationReport(Mockito
+            .any(GetApplicationReportRequest.class))).thenReturn(resp);
+      } catch (YarnException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      return historyManager;
+    }
+  }
+}