You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/28 22:54:57 UTC
[18/50] [abbrv] hadoop git commit: YARN-3967. Fetch the application
report from the AHS if the RM does not know about it. Contributed by Mit
Desai
YARN-3967. Fetch the application report from the AHS if the RM does not
know about it. Contributed by Mit Desai
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fbd60632
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fbd60632
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fbd60632
Branch: refs/heads/YARN-1197
Commit: fbd6063269221ec25834684477f434e19f0b66af
Parents: ee233ec
Author: Xuan <xg...@apache.org>
Authored: Fri Jul 24 10:15:54 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Fri Jul 24 10:15:54 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/server/webproxy/AppReportFetcher.java | 79 +++++++++++--
.../server/webproxy/TestAppReportFetcher.java | 117 +++++++++++++++++++
3 files changed, 187 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8bc9e4c..a25387d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -683,6 +683,9 @@ Release 2.7.2 - UNRELEASED
YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula
via ozawa)
+ YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
+ (Mit Desai via xgong)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
index 5c93413..6aa43eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
@@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.AHSProxy;
import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -41,38 +45,73 @@ public class AppReportFetcher {
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
private final Configuration conf;
private final ApplicationClientProtocol applicationsManager;
+ private final ApplicationHistoryProtocol historyManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
+ private boolean isAHSEnabled;
+
/**
- * Create a new Connection to the RM to fetch Application reports.
+ * Create a new Connection to the RM/Application History Server
+ * to fetch Application reports.
* @param conf the conf to use to know where the RM is.
*/
public AppReportFetcher(Configuration conf) {
+ if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+ YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
+ isAHSEnabled = true;
+ }
this.conf = conf;
try {
applicationsManager = ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class);
+ if (isAHSEnabled) {
+ historyManager = getAHSProxy(conf);
+ } else {
+ this.historyManager = null;
+ }
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
/**
- * Just call directly into the applicationsManager given instead of creating
- * a remote connection to it. This is mostly for when the Proxy is running
- * as part of the RM already.
+ * Create a direct connection to RM instead of a remote connection when
+ * the proxy is running as part of the RM. Also create a remote connection to
+ * Application History Server if it is enabled.
* @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports.
*/
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
+ if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+ YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
+ isAHSEnabled = true;
+ }
this.conf = conf;
this.applicationsManager = applicationsManager;
+ if (isAHSEnabled) {
+ try {
+ historyManager = getAHSProxy(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ this.historyManager = null;
+ }
}
-
+
+ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
+ throws IOException {
+ return AHSProxy.createAHSProxy(configuration,
+ ApplicationHistoryProtocol.class,
+ configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
+ }
+
/**
- * Get a report for the specified app.
- * @param appId the id of the application to get.
- * @return the ApplicationReport for that app.
+ * Get an application report for the specified application id from the RM and
+ * fall back to the Application History Server if not found in RM.
+ * @param appId id of the application to get.
+ * @return the ApplicationReport for the appId.
* @throws YarnException on any error.
* @throws IOException
*/
@@ -81,9 +120,22 @@ public class AppReportFetcher {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);
-
- GetApplicationReportResponse response = applicationsManager
- .getApplicationReport(request);
+
+ GetApplicationReportResponse response;
+ try {
+ response = applicationsManager.getApplicationReport(request);
+ } catch (YarnException e) {
+ if (!isAHSEnabled) {
+ // Just throw it as usual if historyService is not enabled.
+ throw e;
+ }
+ // Even if history-service is enabled, treat all exceptions still the same
+ // except the following
+ if (!(e.getClass() == ApplicationNotFoundException.class)) {
+ throw e;
+ }
+ response = historyManager.getApplicationReport(request);
+ }
return response.getApplicationReport();
}
@@ -91,5 +143,8 @@ public class AppReportFetcher {
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
+ if (this.historyManager != null) {
+ RPC.stopProxy(this.historyManager);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbd60632/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
new file mode 100644
index 0000000..bcab33f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestAppReportFetcher {
+
+ static ApplicationHistoryProtocol historyManager;
+ static Configuration conf = new Configuration();
+ private static ApplicationClientProtocol appManager;
+ private static AppReportFetcher fetcher;
+ private final String appNotFoundExceptionMsg = "APP NOT FOUND";
+
+ @After
+ public void cleanUp() {
+ historyManager = null;
+ appManager = null;
+ fetcher = null;
+ }
+
+ public void testHelper(boolean isAHSEnabled)
+ throws YarnException, IOException {
+ conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
+ isAHSEnabled);
+ appManager = Mockito.mock(ApplicationClientProtocol.class);
+ Mockito.when(appManager
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
+ .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
+ fetcher = new AppReportFetcherForTest(conf, appManager);
+ ApplicationId appId = ApplicationId.newInstance(0,0);
+ fetcher.getApplicationReport(appId);
+ }
+
+ @Test
+ public void testFetchReportAHSEnabled() throws YarnException, IOException {
+ testHelper(true);
+ Mockito.verify(historyManager, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ Mockito.verify(appManager, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ }
+
+ @Test
+ public void testFetchReportAHSDisabled() throws YarnException, IOException {
+ try {
+ testHelper(false);
+ } catch (ApplicationNotFoundException e) {
+ Assert.assertTrue(e.getMessage() == appNotFoundExceptionMsg);
+ /* RM will not know of the app and Application History Service is disabled
+ * So we will not try to get the report from AHS and RM will throw
+ * ApplicationNotFoundException
+ */
+ }
+ Mockito.verify(appManager, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ if (historyManager != null) {
+ Assert.fail("HistoryManager should be null as AHS is disabled");
+ }
+ }
+
+ static class AppReportFetcherForTest extends AppReportFetcher {
+
+ public AppReportFetcherForTest(Configuration conf,
+ ApplicationClientProtocol acp) {
+ super(conf, acp);
+ }
+
+ @Override
+ protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
+ throws IOException
+ {
+ GetApplicationReportResponse resp = Mockito.
+ mock(GetApplicationReportResponse.class);
+ historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
+ try {
+ Mockito.when(historyManager.getApplicationReport(Mockito
+ .any(GetApplicationReportRequest.class))).thenReturn(resp);
+ } catch (YarnException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return historyManager;
+ }
+ }
+}