You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2015/08/31 23:11:38 UTC
ambari git commit: AMBARI-12910. Ambari Views Framework - need
support for accessing RM running under HA. (Dipayan Bhowmick via yusaku)
Repository: ambari
Updated Branches:
refs/heads/trunk ef44337d0 -> ba04e8df9
AMBARI-12910. Ambari Views Framework - need support for accessing RM running under HA. (Dipayan Bhowmick via yusaku)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba04e8df
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba04e8df
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba04e8df
Branch: refs/heads/trunk
Commit: ba04e8df9385a3171adf5c3e296cf3782c6b93cc
Parents: ef44337
Author: Yusaku Sako <yu...@hortonworks.com>
Authored: Mon Aug 31 14:09:36 2015 -0700
Committer: Yusaku Sako <yu...@hortonworks.com>
Committed: Mon Aug 31 14:11:22 2015 -0700
----------------------------------------------------------------------
contrib/views/hive/src/main/resources/view.xml | 2 +-
.../ambari/view/utils/ambari/Services.java | 178 +++++++++---
.../ambari/view/utils/ambari/ServicesTest.java | 281 +++++++++++++++++++
3 files changed, 418 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba04e8df/contrib/views/hive/src/main/resources/view.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/view.xml b/contrib/views/hive/src/main/resources/view.xml
index ed97213..ce0895b 100644
--- a/contrib/views/hive/src/main/resources/view.xml
+++ b/contrib/views/hive/src/main/resources/view.xml
@@ -181,7 +181,7 @@
<parameter>
<name>yarn.resourcemanager.url</name>
- <description>The URL to the YARN ResourceManager, used to provide YARN Application data.</description>
+ <description>The URL to the YARN ResourceManager, used to provide YARN Application data. If YARN ResourceManager HA is enabled, provide a comma separated list of URLs for all the Resource Managers.</description>
<label>YARN ResourceManager URL</label>
<placeholder>http://yarn.resourcemanager.address:8088</placeholder>
<cluster-config>yarn-site/yarn.resourcemanager.webapp.address</cluster-config>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba04e8df/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
index 0b49076..120e377 100644
--- a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
+++ b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,17 @@
package org.apache.ambari.view.utils.ambari;
import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Utilities for specific Hadoop services and util functions for them
@@ -39,6 +40,12 @@ public class Services {
public static final String YARN_SITE = "yarn-site";
public static final String YARN_HTTP_POLICY = "yarn.http.policy";
public static final String YARN_RESOURCEMANAGER_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
+ private static final String YARN_RESOURCEMANAGER_HTTPS_KEY = "yarn.resourcemanager.webapp.https.address";
+ private static final String YARN_RESOURCEMANAGER_HTTP_KEY = "yarn.resourcemanager.webapp.address";
+ private static final String YARN_RESOURCEMANAGER_HA_RM_IDS_KEY = "yarn.resourcemanager.ha.rm-ids";
+ private static final String YARN_RESOURCEMANAGER_HTTP_HA_PARTIAL_KEY = "yarn.resourcemanager.webapp.address.";
+ private static final String YARN_RESOURCEMANAGER_HTTPS_HA_PARTIAL_KEY = "yarn.resourcemanager.webapp.https.address.";
+ public static final String RM_INFO_API_ENDPOINT = "/ws/v1/cluster/info";
private final AmbariApi ambariApi;
private ViewContext context;
@@ -60,36 +67,79 @@ public class Services {
String url;
if (ambariApi.isClusterAssociated()) {
- String protocol;
+ url = getRMUrlFromClusterConfig();
+ } else {
+ url = getRmUrlFromCustomConfig();
+ }
+ return removeTrailingSlash(url);
+ }
+
+ private String getRMUrlFromClusterConfig() {
+ String url;
+ String protocol;
- String haEnabled = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_ENABLED);
- String httpPolicy = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_HTTP_POLICY);
+ String haEnabled = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_ENABLED);
+ String httpPolicy = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_HTTP_POLICY);
+
+ if (!(HTTP_ONLY.equals(httpPolicy) || HTTPS_ONLY.equals(httpPolicy))) {
+ LOG.error(String.format("RA030 Unknown value %s of yarn-site/yarn.http.policy. HTTP_ONLY assumed.", httpPolicy));
+ httpPolicy = HTTP_ONLY;
+ }
+
+ if (haEnabled != null && haEnabled.equals("true")) {
+ String[] urls = getRMHAUrls(httpPolicy);
+ url = getActiveRMUrl(urls);
+ } else {
if (httpPolicy.equals(HTTPS_ONLY)) {
protocol = "https";
- url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, "yarn.resourcemanager.webapp.https.address");
-
+ url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTPS_KEY);
} else {
protocol = "http";
- url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, "yarn.resourcemanager.webapp.address");
- if (!httpPolicy.equals(HTTP_ONLY))
- LOG.error(String.format("RA030 Unknown value %s of yarn-site/yarn.http.policy. HTTP_ONLY assumed.", httpPolicy));
+ url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTP_KEY);
}
-
url = addProtocolIfMissing(url, protocol);
+ }
+ return url;
+ }
- if (haEnabled != null && haEnabled.equals("true")) {
- url = getActiveRMUrl(url);
+ private String[] getRMHAUrls(String httpPolicy) {
+ String haRmIds = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_RM_IDS_KEY);
+ String[] ids = haRmIds.split(",");
+ int index = 0;
+ String[] urls = new String[ids.length];
+ for (String id : ids) {
+ String url, protocol;
+ if (HTTPS_ONLY.equals(httpPolicy)) {
+ url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTPS_HA_PARTIAL_KEY + id);
+ protocol = "https";
+ } else {
+ url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTP_HA_PARTIAL_KEY + id);
+ protocol = "http";
}
- } else {
- url = context.getProperties().get("yarn.resourcemanager.url");
- if (!hasProtocol(url)) {
+
+ urls[index++] = addProtocolIfMissing(url.trim(), protocol);
+ }
+ return urls;
+ }
+
+ private String getRmUrlFromCustomConfig() {
+ // Comma separated list of URLs for HA and single URL for non HA
+ String resourceManagerUrls = context.getProperties().get("yarn.resourcemanager.url");
+ if (!StringUtils.isEmpty(resourceManagerUrls)) {
+ String[] urls = resourceManagerUrls.split(",");
+
+ if (!hasProtocol(urls)) {
throw new AmbariApiException(
- "RA070 View is not cluster associated. Resource Manager URL should contain protocol.");
+ "RA070 View is not cluster associated. All Resource Manager URL should contain protocol.");
}
+ return getActiveRMUrl(urls);
+ } else {
+ throw new AmbariApiException(
+ "RA070 View is not cluster associated. 'YARN ResourceManager URL' should be provided");
}
- return removeTrailingSlash(url);
}
+
private String removeTrailingSlash(String url) {
if (url.endsWith("/")) {
url = url.substring(0, url.length() - 1);
@@ -97,30 +147,60 @@ public class Services {
return url;
}
- public final Pattern refreshHeaderUrlPattern = Pattern.compile("^\\d+;\\s*url=(.*)$");
+ /**
+ * Returns active RM URL. All RM Urls for RM HA is passed as an argument. This iterates over the list of RM hosts
+ * and gets the cluster info. Breaks out and returns the URL when the 'haStatus' parameter returns "ACTIVE".
+ * If only one url is passed, it is considered as ACTIVE and returned. No API call is made in that case.
+ * @param urls array of all the RM Urls
+ * @return url of the active RM
+ */
+ private String getActiveRMUrl(String[] urls) {
+ if (urls.length == 1)
+ return urls[0].trim();
+ else {
+ for (String url : urls) {
+ url = url.trim();
+ if (isActiveUrl(url))
+ return url;
+ }
+ }
+ LOG.error("All ResourceManagers are not accessible or none seem to be active.");
+ throw new AmbariApiException("RA110 All ResourceManagers are not accessible or none seem to be active.");
+ }
/**
- * Returns active RM URL. Makes a request to RM passed as argument.
- * If response contains Refresh header then passed url was standby RM.
- * @param url url of random RM
- * @return url of active RM
+ * Queries RM API to check the haState.
+ * @param url Resource Manager root url
+ * @return true if haState returned is ACTIVE else false
*/
- private String getActiveRMUrl(String url) {
- String activeRMUrl = url;
+
+ private boolean isActiveUrl(String url) {
+ InputStream inputStream = null;
try {
- HttpURLConnection httpURLConnection = context.getURLConnectionProvider().
- getConnection(url, "GET", (String) null, new HashMap<String, String>());
- String refreshHeader = httpURLConnection.getHeaderField("Refresh");
- if (refreshHeader != null) { // we hit standby RM
- Matcher matcher = refreshHeaderUrlPattern.matcher(refreshHeader);
- if (matcher.find()) {
- activeRMUrl = matcher.group(1);
- }
- }
+ inputStream = context.getURLStreamProvider()
+ .readFrom(url + RM_INFO_API_ENDPOINT, "GET", (String) null, new HashMap<String, String>());
+ String response = IOUtils.toString(inputStream);
+ String haState = getHAStateFromRMResponse(response);
+
+ if (StringUtils.isNotEmpty(haState) && "ACTIVE".equals(haState))
+ return true;
+
} catch (IOException e) {
- throw new AmbariApiException("RA110 ResourceManager is not accessible");
+ LOG.error("Resource Manager : %s is not accessible. This cannot be a active RM. Returning false.");
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException e) { /* Noting to do */ }
+ }
}
- return activeRMUrl;
+ return false;
+ }
+
+ private String getHAStateFromRMResponse(String response) {
+ JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
+ JSONObject clusterInfo = (JSONObject) jsonObject.get("clusterInfo");
+ return (String) clusterInfo.get("haState");
}
/**
@@ -145,14 +225,14 @@ public class Services {
host = context.getProperties().get("webhcat.hostname");
if (host == null || host.isEmpty()) {
throw new AmbariApiException(
- "RA080 Can't determine WebHCat hostname neither by associated cluster nor by webhcat.hostname property.");
+ "RA080 Can't determine WebHCat hostname neither by associated cluster nor by webhcat.hostname property.");
}
}
String port = context.getProperties().get("webhcat.port");
if (port == null || port.isEmpty()) {
throw new AmbariApiException(
- "RA090 Can't determine WebHCat port neither by associated cluster nor by webhcat.port property.");
+ "RA090 Can't determine WebHCat port neither by associated cluster nor by webhcat.port property.");
}
return String.format("http://%s:%s/templeton/v1", host, port);
@@ -165,6 +245,20 @@ public class Services {
return url;
}
+
+ /**
+ * Checks if all the urls in the array contains protocol
+ * @param urls Array of urls
+ * @return true if all the urls contain protocol
+ */
+ public static boolean hasProtocol(String[] urls) {
+ for (String url : urls) {
+ if (!hasProtocol(url))
+ return false;
+ }
+ return true;
+ }
+
/**
* Checks if URL has the protocol
* @param url url
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba04e8df/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java b/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
new file mode 100644
index 0000000..1950df8
--- /dev/null
+++ b/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.utils.ambari;
+
+
+import org.apache.ambari.view.URLStreamProvider;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.cluster.Cluster;
+import org.apache.commons.io.IOUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class ServicesTest extends EasyMockSupport {
+
+ private static final String HTTP_RM_URL1 = "http://c1.ambari.apache.org:8088";
+ private static final String HTTP_RM_URL2 = "http://c2.ambari.apache.org:8088";
+ private static final String HTTPS_RM_URL1 = "https://c1.ambari.apache.org:8088";
+ private static final String HTTPS_RM_URL2 = "https://c2.ambari.apache.org:8088";
+ private static final String RM_URL1_HOST_PORT = "c1.ambari.apache.org:8088";
+ private static final String RM_URL2_HOST_PORT = "c2.ambari.apache.org:8088";
+ private static final String RM_INFO_API_ENDPOINT = Services.RM_INFO_API_ENDPOINT;
+
+ @Test(expected = AmbariApiException.class)
+ public void shouldCheckForEmptyYarnRMUrlInCustomConfig() {
+ ViewContext viewContext = getViewContext(new HashMap<String, String>());
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+ replay(viewContext);
+
+ Services services = new Services(ambariApi, viewContext);
+ services.getRMUrl();
+
+
+ }
+
+ @Test(expected = AmbariApiException.class)
+ public void shouldCheckIfAllRMUrlsHaveProtocolInCustomConfig() {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + "," + RM_URL2_HOST_PORT);
+ ViewContext viewContext = getViewContext(map);
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+ replay(viewContext);
+
+ Services services = new Services(ambariApi, viewContext);
+ services.getRMUrl();
+ }
+
+ @Test
+ public void shouldReturnUrlIfSingleIsConfiguredInCustomConfig() {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1);
+ ViewContext viewContext = getViewContext(map);
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+ replay(viewContext);
+
+ Services services = new Services(ambariApi, viewContext);
+ assertEquals(HTTP_RM_URL1, services.getRMUrl());
+ }
+
+ @Test
+ public void shouldConnectToFirstUrlWhenMultipleRMUrlIsConfiguredInCustomConfig() throws IOException {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+ ViewContext viewContext = getViewContext(map);
+
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+ InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+ expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider);
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+ replayAll();
+
+ Services services = new Services(ambariApi, viewContext);
+ assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+ }
+
+ @Test
+ public void shouldConnectToSecondUrlWhenTheFirstURLTimesOut() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+ ViewContext viewContext = getViewContext(map);
+
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+ InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+ expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+ replayAll();
+
+ Services services = new Services(ambariApi, viewContext);
+ assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+ }
+
+ @Test(expected = AmbariApiException.class)
+ public void shouldThrowExceptionWhenAllUrlCannotBeReached() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+ ViewContext viewContext = getViewContext(map);
+
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+ expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+
+ replayAll();
+
+ Services services = new Services(ambariApi, viewContext);
+ services.getRMUrl();
+ }
+
+ @Test
+ public void shouldReturnActiveRMUrlWhenConnectingToStandby() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+ ViewContext viewContext = getViewContext(map);
+
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+ InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"STANDBY\"}}");
+
+ expect(ambariApi.isClusterAssociated()).andReturn(false);
+ expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+ InputStream inputStreamActive = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStreamActive);
+
+ replayAll();
+
+ Services services = new Services(ambariApi, viewContext);
+ assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+ verify(urlStreamProvider);
+
+ }
+
+ @Test
+ public void shouldConnectToRMConfiguredInClusterMode() throws Exception {
+ ViewContext viewContext = getViewContext(new HashMap<String, String>());
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ Cluster cluster = createNiceMock(Cluster.class);
+
+ expect(ambariApi.isClusterAssociated()).andReturn(true).anyTimes();
+ setClusterExpectation(cluster, "HTTP_ONLY");
+ expect(ambariApi.getCluster()).andReturn(cluster).anyTimes();
+
+ replayAll();
+
+ Services services = new Services(ambariApi, viewContext);
+ assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+ reset(cluster);
+ setClusterExpectation(cluster, "HTTPS_ONLY");
+ replay(cluster);
+
+ assertEquals(HTTPS_RM_URL2, services.getRMUrl());
+
+ reset(cluster);
+ setClusterExpectation(cluster, "HTTPS_ONLY_XYZ");
+ replay(cluster);
+
+ assertEquals(HTTP_RM_URL1, services.getRMUrl());
+ }
+
+ @Test
+ public void shouldFetchRMUrlsWhileHAEnabledInClusterMode() throws Exception {
+ ViewContext viewContext = getViewContext(new HashMap<String, String>());
+ AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+ Cluster cluster = createNiceMock(Cluster.class);
+ URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+ Services services = new Services(ambariApi, viewContext);
+
+ InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+
+
+ expect(ambariApi.isClusterAssociated()).andReturn(true).anyTimes();
+ setClusterExpectationInHA(cluster, "HTTP_ONLY");
+ expect(ambariApi.getCluster()).andReturn(cluster).anyTimes();
+ expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+ replayAll();
+
+ assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+ reset(cluster, urlStreamProvider);
+ setClusterExpectationInHA(cluster, "HTTP_ONLY");
+ inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+ expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+ replay(cluster, urlStreamProvider);
+
+ assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+ reset(cluster, urlStreamProvider);
+ setClusterExpectationInHA(cluster, "HTTPS_ONLY");
+ inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+ expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+ replay(cluster, urlStreamProvider);
+
+ assertEquals(HTTPS_RM_URL1, services.getRMUrl());
+
+ reset(cluster, urlStreamProvider);
+ setClusterExpectationInHA(cluster, "HTTPS_ONLY");
+ inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+ expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+ expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"), anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+ replay(cluster, urlStreamProvider);
+
+ assertEquals(HTTPS_RM_URL2, services.getRMUrl());
+ }
+
+ private void setClusterExpectation(Cluster cluster, String httpPolicy) {
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.enabled")).andReturn("false");
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.http.policy")).andReturn(httpPolicy);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address")).andReturn(RM_URL1_HOST_PORT);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address")).andReturn(RM_URL2_HOST_PORT);
+ }
+
+ private void setClusterExpectationInHA(Cluster cluster, String httpPolicy) {
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.enabled")).andReturn("true");
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.http.policy")).andReturn(httpPolicy);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.rm-ids")).andReturn("rm1,rm2");
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address.rm1")).andReturn(RM_URL1_HOST_PORT);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address.rm2")).andReturn(RM_URL2_HOST_PORT);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address.rm1")).andReturn(RM_URL1_HOST_PORT);
+ expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address.rm2")).andReturn(RM_URL2_HOST_PORT);
+ }
+
+ private ViewContext getViewContext(Map<String, String> map) {
+ ViewContext viewContextMock = createNiceMock(ViewContext.class);
+ expect(viewContextMock.getProperties()).andReturn(map);
+ return viewContextMock;
+ }
+}
\ No newline at end of file