You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/07/07 14:49:25 UTC
[11/29] ambari git commit: AMBARI-21370: Support VIPs instead of Host
Names (jluniya)
AMBARI-21370: Support VIPs instead of Host Names (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fde1cced
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fde1cced
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fde1cced
Branch: refs/heads/branch-feature-AMBARI-21348
Commit: fde1cced9dbbbb46bc6812b0336403bd6622a513
Parents: 52980df
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Thu Jun 29 07:17:24 2017 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Thu Jun 29 07:26:30 2017 -0700
----------------------------------------------------------------------
.../ambari_agent/AlertSchedulerHandler.py | 10 +-
.../python/ambari_agent/alerts/base_alert.py | 8 +-
.../python/ambari_agent/alerts/port_alert.py | 107 +++++++++++--------
.../ambari_agent/TestAlertSchedulerHandler.py | 17 +--
.../server/agent/AlertDefinitionCommand.java | 7 +-
.../ambari/server/agent/HeartBeatHandler.java | 4 +-
.../internal/AbstractProviderModule.java | 49 +++++++--
.../server/controller/jmx/JMXHostProvider.java | 15 +++
.../controller/jmx/JMXPropertyProvider.java | 25 +++++
.../org/apache/ambari/server/state/Cluster.java | 8 ++
.../server/state/alert/AlertDefinitionHash.java | 14 +--
.../server/state/cluster/ClusterImpl.java | 18 ++++
.../2.1.0.2.0/package/scripts/hdfs_namenode.py | 4 +-
.../package/scripts/namenode_upgrade.py | 2 +-
.../2.1.0.2.0/package/scripts/params_linux.py | 4 +
.../metrics/JMXPropertyProviderTest.java | 9 ++
.../state/alerts/AlertDefinitionHashTest.java | 4 +-
.../configs/ha_bootstrap_standby_node.json | 2 +-
...ha_bootstrap_standby_node_initial_start.json | 2 +-
...dby_node_initial_start_dfs_nameservices.json | 2 +-
20 files changed, 227 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 6c1d29c..55c3d6e 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -283,6 +283,7 @@ class AlertSchedulerHandler():
for command_json in all_commands:
clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
hostName = '' if not 'hostName' in command_json else command_json['hostName']
+ publicHostName = '' if not 'publicHostName' in command_json else command_json['publicHostName']
clusterHash = None if not 'hash' in command_json else command_json['hash']
# cache the cluster and cluster hash after loading the JSON
@@ -291,7 +292,7 @@ class AlertSchedulerHandler():
self._cluster_hashes[clusterName] = clusterHash
for definition in command_json['alertDefinitions']:
- alert = self.__json_to_callable(clusterName, hostName, definition)
+ alert = self.__json_to_callable(clusterName, hostName, publicHostName, definition)
if alert is None:
continue
@@ -303,7 +304,7 @@ class AlertSchedulerHandler():
return definitions
- def __json_to_callable(self, clusterName, hostName, json_definition):
+ def __json_to_callable(self, clusterName, hostName, publicHostName, json_definition):
"""
converts the json that represents all aspects of a definition
and makes an object that extends BaseAlert that is used for individual
@@ -336,7 +337,7 @@ class AlertSchedulerHandler():
alert = RecoveryAlert(json_definition, source, self.config, self.recovery_manger)
if alert is not None:
- alert.set_cluster(clusterName, hostName)
+ alert.set_cluster(clusterName, hostName, publicHostName)
except Exception, exception:
logger.exception("[AlertScheduler] Unable to load an invalid alert definition. It will be skipped.")
@@ -402,8 +403,9 @@ class AlertSchedulerHandler():
clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
hostName = '' if not 'hostName' in execution_command else execution_command['hostName']
+ publicHostName = '' if not 'publicHostName' in execution_command else execution_command['publicHostName']
- alert = self.__json_to_callable(clusterName, hostName, alert_definition)
+ alert = self.__json_to_callable(clusterName, hostName, publicHostName, alert_definition)
if alert is None:
continue
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index 7f3b2a5..add29fc 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -46,6 +46,7 @@ class BaseAlert(object):
self.alert_source_meta = alert_source_meta
self.cluster_name = ''
self.host_name = ''
+ self.public_host_name = ''
self.config = config
def interval(self):
@@ -86,10 +87,13 @@ class BaseAlert(object):
self.cluster_configuration = cluster_configuration
- def set_cluster(self, cluster_name, host_name):
+ def set_cluster(self, cluster_name, host_name, public_host_name = None):
""" sets cluster information for the alert """
self.cluster_name = cluster_name
self.host_name = host_name
+ self.public_host_name = host_name
+ if public_host_name:
+ self.public_host_name = public_host_name
def _get_alert_meta_value_safely(self, meta_key):
@@ -452,7 +456,7 @@ class BaseAlert(object):
# get the host for dfs.namenode.http-address.c1ha.nn1 and see if it's
# this host
value = self._get_configuration_value(key)
- if value is not None and self.host_name in value:
+ if value is not None and (self.host_name in value or self.public_host_name in value):
return AlertUri(uri=value, is_ssl_enabled=is_ssl_enabled)
return None
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
index 1e32718..02cc91c 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -91,7 +91,9 @@ class PortAlert(BaseAlert):
# if not parameterized, this will return the static value
uri_value = self._get_configuration_value(self.uri)
+ host_not_specified = False
if uri_value is None:
+ host_not_specified = True
uri_value = self.host_name
logger.debug("[Alert][{0}] Setting the URI to this host since it wasn't specified".format(
self.get_name()))
@@ -112,6 +114,16 @@ class PortAlert(BaseAlert):
host = BaseAlert.get_host_from_url(uri_value)
if host is None or host == "localhost" or host == "0.0.0.0":
host = self.host_name
+ host_not_specified = True
+
+ hosts = [host]
+ # If host is not specified in the uri, hence we are using current host name
+ # then also add public host name as a fallback.
+ if host_not_specified and host.lower() == self.host_name.lower() \
+ and self.host_name.lower() != self.public_host_name.lower():
+ hosts.append(self.public_host_name)
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("[Alert][{0}] List of hosts = {1}".format(self.get_name(), hosts))
try:
port = int(get_port_from_url(uri_value))
@@ -122,51 +134,56 @@ class PortAlert(BaseAlert):
port = self.default_port
-
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug("[Alert][{0}] Checking {1} on port {2}".format(
- self.get_name(), host, str(port)))
-
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(self.critical_timeout)
-
- if OSCheck.is_windows_family():
- # on windows 0.0.0.0 is invalid address to connect but on linux it resolved to 127.0.0.1
- host = resolve_address(host)
-
- start_time = time.time()
- s.connect((host, port))
- if self.socket_command is not None:
- s.sendall(self.socket_command)
- data = s.recv(1024)
- if self.socket_command_response is not None and data != self.socket_command_response:
- raise Exception("Expected response {0}, Actual response {1}".format(
- self.socket_command_response, data))
- end_time = time.time()
- milliseconds = end_time - start_time
- seconds = milliseconds / 1000.0
-
- # not sure why this happens sometimes, but we don't always get a
- # socket exception if the connect() is > than the critical threshold
- if seconds >= self.critical_timeout:
- return (self.RESULT_CRITICAL, ['Socket Timeout', host, port])
-
- result = self.RESULT_OK
- if seconds >= self.warning_timeout:
- result = self.RESULT_WARNING
-
- return (result, [seconds, port])
- except Exception as e:
- return (self.RESULT_CRITICAL, [str(e), host, port])
- finally:
- if s is not None:
- try:
- s.close()
- except:
- # no need to log a close failure
- pass
+ exceptions = []
+
+ for host in hosts:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("[Alert][{0}] Checking {1} on port {2}".format(
+ self.get_name(), host, str(port)))
+
+ s = None
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(self.critical_timeout)
+
+ if OSCheck.is_windows_family():
+ # on windows 0.0.0.0 is invalid address to connect but on linux it resolved to 127.0.0.1
+ host = resolve_address(host)
+
+ start_time = time.time()
+ s.connect((host, port))
+ if self.socket_command is not None:
+ s.sendall(self.socket_command)
+ data = s.recv(1024)
+ if self.socket_command_response is not None and data != self.socket_command_response:
+ raise Exception("Expected response {0}, Actual response {1}".format(
+ self.socket_command_response, data))
+ end_time = time.time()
+ milliseconds = end_time - start_time
+ seconds = milliseconds / 1000.0
+
+ # not sure why this happens sometimes, but we don't always get a
+ # socket exception if the connect() is > than the critical threshold
+ if seconds >= self.critical_timeout:
+ return (self.RESULT_CRITICAL, ['Socket Timeout', host, port])
+
+ result = self.RESULT_OK
+ if seconds >= self.warning_timeout:
+ result = self.RESULT_WARNING
+
+ return (result, [seconds, port])
+ except Exception as e:
+ exceptions.append(e)
+ finally:
+ if s is not None:
+ try:
+ s.close()
+ except:
+ # no need to log a close failure
+ pass
+
+ if exceptions:
+ return (self.RESULT_CRITICAL, [str(exceptions[0]), hosts[0], port])
def _get_reporting_text(self, state):
'''
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
index d1d27ef..fbcd33f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
@@ -70,7 +70,7 @@ class TestAlertSchedulerHandler(TestCase):
}
}
- callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
+ callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition))
self.assertTrue(callable_result is not None)
self.assertTrue(isinstance(callable_result, MetricAlert))
@@ -85,7 +85,7 @@ class TestAlertSchedulerHandler(TestCase):
}
}
- callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
+ callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition))
self.assertTrue(callable_result is not None)
self.assertTrue(isinstance(callable_result, AmsAlert))
@@ -100,7 +100,7 @@ class TestAlertSchedulerHandler(TestCase):
}
scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None)
- callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
+ callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition))
self.assertTrue(callable_result is not None)
self.assertTrue(isinstance(callable_result, PortAlert))
@@ -116,7 +116,7 @@ class TestAlertSchedulerHandler(TestCase):
}
scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None)
- callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
+ callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition))
self.assertTrue(callable_result is not None)
self.assertTrue(isinstance(callable_result, WebAlert))
@@ -131,7 +131,7 @@ class TestAlertSchedulerHandler(TestCase):
}
scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None)
- callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
+ callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition))
self.assertTrue(callable_result is None)
@@ -174,6 +174,7 @@ class TestAlertSchedulerHandler(TestCase):
{
'clusterName': 'cluster',
'hostName': 'host',
+ 'publicHostName' : 'host',
'alertDefinition': {
'name': 'alert1'
}
@@ -191,7 +192,7 @@ class TestAlertSchedulerHandler(TestCase):
scheduler.execute_alert(execution_commands)
- scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', {'name': 'alert1'})
+ scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', 'host', {'name': 'alert1'})
self.assertTrue(alert_mock.collect.called)
def test_execute_alert_from_extension(self):
@@ -199,6 +200,7 @@ class TestAlertSchedulerHandler(TestCase):
{
'clusterName': 'cluster',
'hostName': 'host',
+ 'publicHostName' : 'host',
'alertDefinition': {
'name': 'alert1'
}
@@ -216,7 +218,7 @@ class TestAlertSchedulerHandler(TestCase):
scheduler.execute_alert(execution_commands)
- scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', {'name': 'alert1'})
+ scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', 'host', {'name': 'alert1'})
self.assertTrue(alert_mock.collect.called)
def test_load_definitions(self):
@@ -245,6 +247,7 @@ class TestAlertSchedulerHandler(TestCase):
{
'clusterName': 'cluster',
'hostName': 'host',
+ 'publicHostName' : 'host',
'alertDefinition': {
'name': 'alert1'
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
index 4d2e048..d603f60 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
@@ -46,6 +46,9 @@ public class AlertDefinitionCommand extends AgentCommand {
@SerializedName("hostName")
private final String m_hostName;
+ @SerializedName("publicHostName")
+ private final String m_publicHostName;
+
@SerializedName("hash")
private final String m_hash;
@@ -61,17 +64,19 @@ public class AlertDefinitionCommand extends AgentCommand {
* @param clusterName
* the name of the cluster this response is for (
* @param hostName
+ * @param publicHostName
* @param hash
* @param definitions
*
* @see AlertDefinitionHash
*/
- public AlertDefinitionCommand(String clusterName, String hostName,
+ public AlertDefinitionCommand(String clusterName, String hostName, String publicHostName,
String hash, List<AlertDefinition> definitions) {
super(AgentCommandType.ALERT_DEFINITION_COMMAND);
m_clusterName = clusterName;
m_hostName = hostName;
+ m_publicHostName = publicHostName;
m_hash = hash;
m_definitions = definitions;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index fd43de5..d3ea24b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -617,8 +617,10 @@ public class HeartBeatHandler {
clusterName, hostname);
String hash = alertDefinitionHash.getHash(clusterName, hostname);
+ Host host = cluster.getHost(hostname);
+ String publicHostName = host == null? hostname : host.getPublicHostName();
AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
- hostname, hash, definitions);
+ hostname, publicHostName, hash, definitions);
command.addConfigs(configHelper, cluster);
commands.add(command);
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
index 3eae675..74f9953 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
@@ -65,6 +65,7 @@ import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.Service;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -458,6 +459,12 @@ public abstract class AbstractProviderModule implements ProviderModule,
}
@Override
+ public String getPublicHostName(String clusterName, String hostName) {
+ Host host = getHost(clusterName, hostName);
+ return host == null? hostName : host.getPublicHostName();
+ }
+
+ @Override
public Set<String> getHostNames(String clusterName, String componentName) {
Set<String> hosts = null;
try {
@@ -471,6 +478,21 @@ public abstract class AbstractProviderModule implements ProviderModule,
}
@Override
+ public Host getHost(String clusterName, String hostName) {
+ Host host = null;
+ try {
+ Cluster cluster = managementController.getClusters().getCluster(clusterName);
+ if(cluster != null) {
+ host = cluster.getHost(hostName);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception in getting host info for jmx metrics: ", e);
+ }
+ return host;
+ }
+
+
+ @Override
public boolean isCollectorComponentLive(String clusterName, MetricsService service) throws SystemException {
final String collectorHostName = getCollectorHostName(clusterName, service);
@@ -527,12 +549,14 @@ public abstract class AbstractProviderModule implements ProviderModule,
serviceConfigTypes.get(service)
);
- Map<String, String[]> componentPortsProperties = new HashMap<String, String[]>();
+ String publicHostName = getPublicHostName(clusterName, hostName);
+ Map<String, String[]> componentPortsProperties = new HashMap<>();
componentPortsProperties.put(
componentName,
getPortProperties(service,
componentName,
hostName,
+ publicHostName,
configProperties,
httpsEnabled
)
@@ -552,7 +576,7 @@ public abstract class AbstractProviderModule implements ProviderModule,
}
}
- initRpcSuffixes(clusterName, componentName, configType, currVersion, hostName);
+ initRpcSuffixes(clusterName, componentName, configType, currVersion, hostName, publicHostName);
}
} catch (Exception e) {
LOG.error("Exception initializing jmx port maps. ", e);
@@ -574,8 +598,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
}
/**
- * Computes properties that contains proper port for {@code componentName} on {@code hostName}. Must contain custom logic
- * for different configurations(like NAMENODE HA).
+ * Computes properties that contains proper port for {@code componentName} on {@code hostName}.
+ * Must contain custom logic for different configurations(like NAMENODE HA).
* @param service service type
* @param componentName component name
* @param hostName host which contains requested component
@@ -583,16 +607,20 @@ public abstract class AbstractProviderModule implements ProviderModule,
* @param httpsEnabled indicates if https enabled for component
* @return property name that contain port for {@code componentName} on {@code hostName}
*/
- String[] getPortProperties(Service.Type service, String componentName, String hostName, Map<String, Object> properties, boolean httpsEnabled) {
+ String[] getPortProperties(Service.Type service, String componentName,
+ String hostName, String publicHostName, Map<String, Object> properties, boolean httpsEnabled) {
componentName = httpsEnabled ? componentName + "-HTTPS" : componentName;
if(componentName.startsWith("NAMENODE") && properties.containsKey("dfs.internal.nameservices")) {
componentName += "-HA";
- return getNamenodeHaProperty(properties, serviceDesiredProperties.get(service).get(componentName), hostName);
+ return getNamenodeHaProperty(
+ properties, serviceDesiredProperties.get(service).get(componentName), hostName, publicHostName);
}
return serviceDesiredProperties.get(service).get(componentName);
}
- private String[] getNamenodeHaProperty(Map<String, Object> properties, String pattern[], String hostName) {
+ private String[] getNamenodeHaProperty(Map<String, Object> properties, String pattern[],
+ String hostName, String publicHostName) {
+
// iterate over nameservices and namenodes, to find out namenode http(s) property for concrete host
for(String nameserviceId : ((String)properties.get("dfs.internal.nameservices")).split(",")) {
if(properties.containsKey("dfs.ha.namenodes."+nameserviceId)) {
@@ -604,7 +632,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
);
if (properties.containsKey(propertyName)) {
String propertyValue = (String)properties.get(propertyName);
- if (propertyValue.split(":")[0].equals(hostName)) {
+ String propHostName = propertyValue.split(":")[0];
+ if (propHostName.equals(hostName) || propHostName.equals(publicHostName)) {
return new String[] {propertyName};
}
}
@@ -1183,7 +1212,7 @@ public abstract class AbstractProviderModule implements ProviderModule,
private void initRpcSuffixes(String clusterName, String componentName,
String config, String configVersion,
- String hostName)
+ String hostName, String publicHostName)
throws Exception {
if (jmxDesiredRpcSuffixProperties.containsKey(componentName)) {
Map<String, Map<String, String>> componentToPortsMap;
@@ -1211,7 +1240,7 @@ public abstract class AbstractProviderModule implements ProviderModule,
keys = jmxDesiredRpcSuffixProperties.get(componentName);
Map<String, String[]> stringMap = jmxDesiredRpcSuffixProperties.get(componentName);
for (String tag: stringMap.keySet()) {
- keys.put(tag, getNamenodeHaProperty(configProperties, stringMap.get(tag), hostName));
+ keys.put(tag, getNamenodeHaProperty(configProperties, stringMap.get(tag), hostName, publicHostName));
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
index 9f913b0..e3ce66d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
@@ -18,6 +18,11 @@
package org.apache.ambari.server.controller.jmx;
import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.state.Host;
+
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.SystemException;
import java.util.Set;
@@ -26,6 +31,8 @@ import java.util.Set;
*/
public interface JMXHostProvider {
+ String getPublicHostName(String clusterName, String hostName);
+
/**
* Get the JMX host names for the given cluster name and component name.
*
@@ -38,6 +45,14 @@ public interface JMXHostProvider {
public Set<String> getHostNames(String clusterName, String componentName);
/**
+ * Get cluster host info given the host name
+ * @param clusterName
+ * @param hostName the host name
+ * @return the host info {@link Host}
+ */
+ Host getHost(String clusterName, String hostName);
+
+ /**
* Get the port for the specified cluster name and component.
*
* @param clusterName the cluster name
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index cbc15cb..867eaea 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -40,6 +40,7 @@ import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.SystemException;
import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.services.MetricsRetrievalService;
import org.apache.ambari.server.state.services.MetricsRetrievalService.MetricSourceType;
import org.slf4j.Logger;
@@ -254,6 +255,8 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
for (String hostName : hostNames) {
try {
String port = getPort(clusterName, componentName, hostName, httpsEnabled);
+ String publicHostName = jmxHostProvider.getPublicHostName(clusterName, hostName);
+
if (port == null) {
LOG.warn("Unable to get JMX metrics. No port value for " + componentName);
return resource;
@@ -268,6 +271,17 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
// check to see if there is a cached value and use it if there is
JMXMetricHolder jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(jmxUrl);
+ if( jmxMetricHolder == null && !hostName.equalsIgnoreCase(publicHostName)) {
+ // build the URL using public host name
+ String publicJmxUrl = getSpec(protocol, publicHostName, port, "/jmx");
+
+ // always submit a request to cache the latest data
+ metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, publicJmxUrl);
+
+ // check to see if there is a cached value and use it if there is
+ jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(publicJmxUrl);
+ }
+
// if the ticket becomes invalid (timeout) then bail out
if (!ticket.isValid()) {
return resource;
@@ -290,6 +304,17 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, adHocUrl);
JMXMetricHolder adHocJMXMetricHolder = metricsRetrievalService.getCachedJMXMetric(adHocUrl);
+ if( adHocJMXMetricHolder == null && !hostName.equalsIgnoreCase(publicHostName)) {
+ // build the adhoc URL using public host name
+ String publicAdHocUrl = getSpec(protocol, publicHostName, port, queryURL);
+
+ // always submit a request to cache the latest data
+ metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, publicAdHocUrl);
+
+ // check to see if there is a cached value and use it if there is
+ adHocJMXMetricHolder = metricsRetrievalService.getCachedJMXMetric(publicAdHocUrl);
+ }
+
// if the ticket becomes invalid (timeout) then bail out
if (!ticket.isValid()) {
return resource;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 56c2b36..885704e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -127,6 +127,14 @@ public interface Cluster {
*/
Set<String> getHosts(String serviceName, String componentName);
+ /**
+ * Get specific host info using host name.
+ *
+ * @param hostName the host name
+ * @return Host info {@link Host}
+ */
+ Host getHost(String hostName);
+
/**
* Adds schs to cluster AND persists them
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
index 57181e5..f9ac28d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
@@ -465,7 +465,7 @@ public class AlertDefinitionHash {
hostNames.add(host.getHostName());
}
- enqueueAgentCommands(clusterName, hostNames);
+ enqueueAgentCommands(cluster, clusterName, hostNames);
} catch (AmbariException ae) {
LOG.error("Unable to lookup cluster for alert definition commands", ae);
}
@@ -487,15 +487,16 @@ public class AlertDefinitionHash {
*/
public void enqueueAgentCommands(long clusterId, Collection<String> hosts) {
String clusterName = null;
+ Cluster cluster = null;
try {
- Cluster cluster = m_clusters.get().getClusterById(clusterId);
+ cluster = m_clusters.get().getClusterById(clusterId);
clusterName = cluster.getClusterName();
} catch (AmbariException ae) {
LOG.error("Unable to lookup cluster for alert definition commands", ae);
}
- enqueueAgentCommands(clusterName, hosts);
+ enqueueAgentCommands(cluster, clusterName, hosts);
}
/**
@@ -512,7 +513,7 @@ public class AlertDefinitionHash {
* @param hosts
* the hosts to push {@link AlertDefinitionCommand}s for.
*/
- private void enqueueAgentCommands(String clusterName, Collection<String> hosts) {
+ private void enqueueAgentCommands(Cluster cluster, String clusterName, Collection<String> hosts) {
if (null == clusterName) {
LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
return;
@@ -530,11 +531,12 @@ public class AlertDefinitionHash {
String hash = getHash(clusterName, hostName);
+ Host host = cluster.getHost(hostName);
+ String publicHostName = host == null? hostName : host.getPublicHostName();
AlertDefinitionCommand command = new AlertDefinitionCommand(
- clusterName, hostName, hash, definitions);
+ clusterName, hostName, publicHostName, hash, definitions);
try {
- Cluster cluster = m_clusters.get().getCluster(clusterName);
command.addConfigs(m_configHelper.get(), cluster);
} catch (AmbariException ae) {
LOG.warn("Unable to add configurations to alert definition command",
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index a9e33f0..c9ed288 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -2930,6 +2930,24 @@ public class ClusterImpl implements Cluster {
}
@Override
+ public Host getHost(final String hostName) {
+ if (StringUtils.isEmpty(hostName)) {
+ return null;
+ }
+
+ Collection<Host> hosts = getHosts();
+ if(hosts != null) {
+ for (Host host : hosts) {
+ String hostString = host.getHostName();
+ if(hostName.equalsIgnoreCase(hostString)) {
+ return host;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Collection<Host> getHosts() {
Map<String, Host> hosts;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
index aa34dc0..28036cf 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
@@ -117,7 +117,7 @@ def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None,
if params.dfs_ha_enabled and \
params.dfs_ha_namenode_standby is not None and \
- params.hostname == params.dfs_ha_namenode_standby:
+ (params.hostname == params.dfs_ha_namenode_standby or params.public_hostname == params.dfs_ha_namenode_standby):
# if the current host is the standby NameNode in an HA deployment
# run the bootstrap command, to start the NameNode in standby mode
# this requires that the active NameNode is already up and running,
@@ -330,7 +330,7 @@ def format_namenode(force=None):
)
else:
if params.dfs_ha_namenode_active is not None and \
- params.hostname == params.dfs_ha_namenode_active:
+ (params.hostname == params.dfs_ha_namenode_active or params.public_hostname == params.dfs_ha_namenode_active):
# check and run the format command in the HA deployment scenario
# only format the "active" namenode in an HA deployment
if force:
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
index f683dcc..14d6ce2 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
@@ -47,7 +47,7 @@ def prepare_upgrade_check_for_previous_dir():
if params.dfs_ha_enabled:
namenode_ha = NamenodeHAState()
- if namenode_ha.is_active(params.hostname):
+ if namenode_ha.is_active(params.hostname) or namenode_ha.is_active(params.public_hostname):
Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
problematic_previous_namenode_dirs = set()
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
index fc87896..f98aafa 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
@@ -168,6 +168,7 @@ klist_path_local = get_klist_path(default('/configurations/kerberos-env/executab
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
#hosts
hostname = config["hostname"]
+public_hostname = config["public_hostname"]
rm_host = default("/clusterHostInfo/rm_host", [])
slave_hosts = default("/clusterHostInfo/slave_hosts", [])
oozie_servers = default("/clusterHostInfo/oozie_server", [])
@@ -306,6 +307,9 @@ if dfs_ha_enabled:
if hostname.lower() in nn_host.lower():
namenode_id = nn_id
namenode_rpc = nn_host
+ elif public_hostname.lower() in nn_host.lower():
+ namenode_id = nn_id
+ namenode_rpc = nn_host
# With HA enabled namenode_address is recomputed
namenode_address = format('hdfs://{dfs_ha_nameservices}')
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
index 2225997..0dde387 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
@@ -53,6 +53,7 @@ import org.apache.ambari.server.security.authorization.AuthorizationException;
import org.apache.ambari.server.security.authorization.AuthorizationHelperInitializer;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.services.MetricsRetrievalService;
import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor;
import org.junit.After;
@@ -604,11 +605,19 @@ public class JMXPropertyProviderTest {
this.unknownPort = unknownPort;
}
+ @Override public String getPublicHostName(final String clusterName, final String hostName) {
+ return null;
+ }
+
@Override
public Set<String> getHostNames(String clusterName, String componentName) {
return null;
}
+ @Override public Host getHost(final String clusterName, final String hostName) {
+ return null;
+ }
+
@Override
public String getPort(String clusterName, String componentName, String hostName, boolean httpsEnabled) throws SystemException {
return getPort(clusterName, componentName, hostName);
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java
index 7a24966..4024343 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java
@@ -379,10 +379,10 @@ public class AlertDefinitionHashTest extends TestCase {
ActionQueue actionQueue = m_injector.getInstance(ActionQueue.class);
AlertDefinitionCommand definitionCommand1 = new AlertDefinitionCommand(
- CLUSTERNAME, HOSTNAME, "12345", null);
+ CLUSTERNAME, HOSTNAME, HOSTNAME, "12345", null);
AlertDefinitionCommand definitionCommand2 = new AlertDefinitionCommand(
- CLUSTERNAME, "anotherHost", "67890", null);
+ CLUSTERNAME, "anotherHost", "anotherHost", "67890", null);
AlertExecutionCommand executionCommand = new AlertExecutionCommand(
CLUSTERNAME, HOSTNAME, null);
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json
index 96f4d9d..df09021 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json
@@ -36,7 +36,7 @@
"script_type": "PYTHON"
},
"taskId": 93,
- "public_hostname": "c6401.ambari.apache.org",
+ "public_hostname": "c6402.ambari.apache.org",
"configurations": {
"mapred-site": {
"mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020",
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json
index de2742f..a0a8f36 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json
@@ -37,7 +37,7 @@
"phase": "INITIAL_START"
},
"taskId": 93,
- "public_hostname": "c6401.ambari.apache.org",
+ "public_hostname": "c6402.ambari.apache.org",
"configurations": {
"mapred-site": {
"mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020",
http://git-wip-us.apache.org/repos/asf/ambari/blob/fde1cced/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json
index ba0fa8f..a3176bd 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json
@@ -37,7 +37,7 @@
"phase": "INITIAL_START"
},
"taskId": 93,
- "public_hostname": "c6401.ambari.apache.org",
+ "public_hostname": "c6402.ambari.apache.org",
"configurations": {
"mapred-site": {
"mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020",