You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/04/03 12:29:13 UTC
[ambari] branch trunk updated: AMBARI-23434. Fix stack issues in
HDFS to support Namenode Federation setup (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0824ed2 AMBARI-23434. Fix stack issues in HDFS to support Namenode Federation setup (aonishuk)
0824ed2 is described below
commit 0824ed2c7b8699f561ff18ce0d5c4508cfe65791
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Apr 3 15:11:06 2018 +0300
AMBARI-23434. Fix stack issues in HDFS to support Namenode Federation setup (aonishuk)
---
.../resource_management/core/providers/system.py | 3 +-
.../main/python/resource_management/core/shell.py | 12 ++---
.../libraries/functions/namenode_ha_utils.py | 37 +++++++++++----
.../libraries/providers/hdfs_resource.py | 10 +++-
.../2.1.0.2.0/package/scripts/hdfs_namenode.py | 1 -
.../HDFS/2.1.0.2.0/package/scripts/journalnode.py | 2 +-
.../HDFS/2.1.0.2.0/package/scripts/namenode.py | 19 +++++++-
.../HDFS/2.1.0.2.0/package/scripts/params_linux.py | 53 +++++++++++++---------
.../HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py | 3 ++
.../test/python/stacks/2.0.6/HDFS/test_namenode.py | 2 +-
10 files changed, 98 insertions(+), 44 deletions(-)
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index 2b8d5f7..6293436 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -259,7 +259,8 @@ class ExecuteProvider(Provider):
timeout_kill_strategy=self.resource.timeout_kill_strategy,
on_new_line=self.resource.on_new_line,
stdout=self.resource.stdout,stderr=self.resource.stderr,
- tries=self.resource.tries, try_sleep=self.resource.try_sleep)
+ tries=self.resource.tries, try_sleep=self.resource.try_sleep,
+ returns=self.resource.returns)
class ExecuteScriptProvider(Provider):
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index 6afcca1..e0c71f8 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -90,7 +90,7 @@ def preexec_fn():
@log_function_call
def checked_call(command, quiet=False, logoutput=None, stdout=subprocess32.PIPE,stderr=subprocess32.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT, returns=[0]):
"""
Execute the shell command and throw an exception on failure.
@throws Fail
@@ -99,12 +99,12 @@ def checked_call(command, quiet=False, logoutput=None, stdout=subprocess32.PIPE,
return _call_wrapper(command, logoutput=logoutput, throw_on_failure=True, stdout=stdout, stderr=stderr,
cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
- tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy)
+ tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy, returns=returns)
@log_function_call
def call(command, quiet=False, logoutput=None, stdout=subprocess32.PIPE,stderr=subprocess32.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT, returns=[0]):
"""
Execute the shell command despite failures.
@return: return_code, output
@@ -112,7 +112,7 @@ def call(command, quiet=False, logoutput=None, stdout=subprocess32.PIPE,stderr=s
return _call_wrapper(command, logoutput=logoutput, throw_on_failure=False, stdout=stdout, stderr=stderr,
cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
- tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy)
+ tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy, returns=returns)
@log_function_call
def non_blocking_call(command, quiet=False, stdout=subprocess32.PIPE,stderr=subprocess32.STDOUT,
@@ -166,7 +166,7 @@ def _call_wrapper(command, **kwargs):
def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess32.PIPE,stderr=subprocess32.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT, returns=[0]):
"""
Execute shell command
@@ -303,7 +303,7 @@ def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess32.PI
code = proc.returncode
- if throw_on_failure and code:
+ if throw_on_failure and not code in returns:
err_msg = Logger.filter_text("Execution of '{0}' returned {1}. {2}".format(command_alias, code, all_output))
raise ExecutionFailed(err_msg, code, out, err)
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
index cea8a18..05fff1f 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
@@ -44,14 +44,17 @@ NAMENODE_RPC_NON_HA = 'dfs.namenode.rpc-address'
JMX_URI_FRAGMENT = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem"
INADDR_ANY = '0.0.0.0'
-def get_namenode_states(hdfs_site, security_enabled, run_user, times=10, sleep_time=1, backoff_factor=2):
+class NoActiveNamenodeException(Fail):
+ pass
+
+def get_namenode_states(hdfs_site, security_enabled, run_user, times=5, sleep_time=1, backoff_factor=2, name_service=None):
"""
return format [('nn1', 'hdfs://hostname1:port1'), ('nn2', 'hdfs://hostname2:port2')] , [....], [....]
"""
@retry(times=times, sleep_time=sleep_time, backoff_factor=backoff_factor, err_class=Fail)
def doRetries(hdfs_site, security_enabled, run_user):
doRetries.attempt += 1
- active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states_noretries(hdfs_site, security_enabled, run_user, doRetries.attempt == times)
+ active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states_noretries(hdfs_site, security_enabled, run_user, doRetries.attempt == times, name_service=name_service)
Logger.info(
"NameNode HA states: active_namenodes = {0}, standby_namenodes = {1}, unknown_namenodes = {2}".format(
active_namenodes, standby_namenodes, unknown_namenodes))
@@ -66,7 +69,7 @@ def get_namenode_states(hdfs_site, security_enabled, run_user, times=10, sleep_t
return doRetries(hdfs_site, security_enabled, run_user)
-def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_retry=True):
+def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_retry=True, name_service=None):
"""
returns data for all name nodes of all name services
"""
@@ -74,7 +77,7 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_re
standby_namenodes = []
unknown_namenodes = []
- name_services = get_nameservices(hdfs_site)
+ name_services = get_nameservices(hdfs_site) if not name_service else [name_service]
for name_service in name_services:
active, standby, unknown = _get_namenode_states_noretries_single_ns(hdfs_site, name_service, security_enabled, run_user, last_retry)
active_namenodes += active
@@ -160,7 +163,7 @@ def get_active_namenode(hdfs_site, security_enabled, run_user):
if active_namenodes:
return active_namenodes[0]
- raise Fail('No active NameNode was found.')
+ raise NoActiveNamenodeException('No active NameNode was found.')
def get_property_for_active_namenodes(hdfs_site, property_name, security_enabled, run_user):
"""
@@ -174,6 +177,18 @@ def get_property_for_active_namenodes(hdfs_site, property_name, security_enabled
return result
+def get_properties_for_all_nameservices(hdfs_site, property_name):
+ name_services = get_nameservices(hdfs_site)
+
+ result = {}
+
+ if name_services:
+ for name_service in name_services:
+ result[name_service] = hdfs_site[property_name+'.'+name_service]
+ else:
+ result[None] = hdfs_site[property_name]
+
+ return result
def get_property_for_active_namenode(hdfs_site, name_service, property_name, security_enabled, run_user):
"""
@@ -187,12 +202,12 @@ def get_property_for_active_namenode(hdfs_site, name_service, property_name, sec
name_services = get_nameservices(hdfs_site)
if name_service not in name_services:
- raise Fail('Trying to get property for non-existing ns=\'{1}\'. Valid namespaces are {2}'.format(property_name, name_service, ','.join(name_services)))
+ raise Fail('Trying to get property for non-existing ns=\'{1}\'. Valid nameservices are {2}'.format(property_name, name_service, ','.join(name_services)))
- active_namenodes = get_namenode_states(hdfs_site, security_enabled, run_user)[0]
+ active_namenodes = get_namenode_states(hdfs_site, security_enabled, run_user, name_service=name_service)[0]
if not len(active_namenodes):
- raise Fail("There is no active namenodes.")
+ raise NoActiveNamenodeException("There are no active namenodes.")
active_namenode_id = active_namenodes[0][0]
value = hdfs_site[format("{property_name}.{name_service}.{active_namenode_id}")]
@@ -209,6 +224,10 @@ def get_property_for_active_namenode(hdfs_site, name_service, property_name, sec
return value
+def namenode_federation_enabled(hdfs_site):
+ name_services = get_nameservices(hdfs_site)
+ return (len(name_services) > 1)
+
def get_all_namenode_addresses(hdfs_site):
"""
- In non-ha mode it will return list of hdfs_site[dfs.namenode.http[s]-address]
@@ -286,7 +305,7 @@ def get_nameservices(hdfs_site):
name_services_string = hdfs_site.get('dfs.nameservices', None)
- if name_services_string and ',' in name_services_string:
+ if name_services_string:
import re
for ns in name_services_string.split(","):
if 'dfs.namenode.shared.edits.dir' in hdfs_site and re.match(r'.*%s$' % ns, hdfs_site['dfs.namenode.shared.edits.dir']): # better would be core_site['fs.defaultFS'] but it's not available
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index 57eff0b..897e83c 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -324,7 +324,15 @@ class HdfsResourceWebHDFS:
self.action_delayed_for_nameservice(None, action_name, main_resource)
else:
for nameservice in nameservices:
- self.action_delayed_for_nameservice(nameservice, action_name, main_resource)
+ try:
+ self.action_delayed_for_nameservice(nameservice, action_name, main_resource)
+ except namenode_ha_utils.NoActiveNamenodeException as ex:
+ # one of ns can be down (during initial start forexample) no need to worry for federated cluster
+ if len(nameservices) > 1:
+ Logger.exception("Cannot run HdfsResource for nameservice {0}. Due to no active namenode present".format(nameservice))
+ else:
+ raise
+
def action_delayed_for_nameservice(self, nameservice, action_name, main_resource):
self.util = WebHDFSUtil(main_resource.resource.hdfs_site, nameservice, main_resource.resource.user,
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
index 2224f72..d935792 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
@@ -120,7 +120,6 @@ def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None,
format_namenode()
pass
-
if params.dfs_ha_enabled and \
params.dfs_ha_namenode_standby is not None and \
(params.hostname == params.dfs_ha_namenode_standby or params.public_hostname == params.dfs_ha_namenode_standby):
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
index 75b2eeb..1adba1c 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
@@ -85,7 +85,7 @@ class JournalNodeDefault(JournalNode):
def configure(self, env):
import params
- Directory(params.jn_edits_dir,
+ Directory(params.jn_edits_dirs,
create_parents = True,
cd_access="a",
owner=params.hdfs_user,
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 9172213..e4a9845 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -98,13 +98,28 @@ class NameNode(Script):
import params
env.set_params(params)
- format_namenode()
+ if params.security_enabled:
+ Execute(params.nn_kinit_cmd,
+ user=params.hdfs_user
+ )
+
+ # this is run on a new namenode, format needs to be forced
+ Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"),
+ user = params.hdfs_user,
+ path = [params.hadoop_bin_dir],
+ logoutput=True
+ )
def bootstrap_standby(self, env):
import params
env.set_params(params)
- Execute("hdfs namenode -bootstrapStandby",
+ if params.security_enabled:
+ Execute(params.nn_kinit_cmd,
+ user=params.hdfs_user
+ )
+
+ Execute("hdfs namenode -bootstrapStandby -nonInteractive",
user=params.hdfs_user,
logoutput=True
)
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
index 7e06829..1a6b2a8 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
@@ -44,6 +44,7 @@ from resource_management.libraries.functions.hdfs_utils import is_https_enabled_
from resource_management.libraries.functions import is_empty
from resource_management.libraries.functions.get_architecture import get_architecture
from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config
+from resource_management.libraries.functions.namenode_ha_utils import get_properties_for_all_nameservices, namenode_federation_enabled
config = Script.get_config()
tmp_dir = Script.get_tmp_dir()
@@ -242,8 +243,13 @@ nfs_file_dump_dir = config['configurations']['hdfs-site']['nfs.file.dump.dir']
dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path']
dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path)
+hdfs_site = config['configurations']['hdfs-site']
+
-jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir']
+if namenode_federation_enabled(hdfs_site):
+ jn_edits_dirs = get_properties_for_all_nameservices(hdfs_site, 'dfs.journalnode.edits.dir').values()
+else:
+ jn_edits_dirs = [config['configurations']['hdfs-site']['dfs.journalnode.edits.dir']]
dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir']
@@ -290,7 +296,7 @@ dfs_ha_enabled = False
dfs_ha_nameservices = default('/configurations/hdfs-site/dfs.internal.nameservices', None)
if dfs_ha_nameservices is None:
dfs_ha_nameservices = default('/configurations/hdfs-site/dfs.nameservices', None)
-dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
+dfs_ha_namenode_ids_all_ns = get_properties_for_all_nameservices(hdfs_site, 'dfs.ha.namenodes')
dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False)
# hostname of the active HDFS HA Namenode (only used when HA is enabled)
@@ -308,26 +314,29 @@ namenode_rpc = None
dfs_ha_namemodes_ids_list = []
other_namenode_id = None
-if dfs_ha_namenode_ids:
- dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
- dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
- if dfs_ha_namenode_ids_array_len > 1:
- dfs_ha_enabled = True
-if dfs_ha_enabled:
- for nn_id in dfs_ha_namemodes_ids_list:
- nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
- if hostname.lower() in nn_host.lower():
- namenode_id = nn_id
- namenode_rpc = nn_host
- elif public_hostname.lower() in nn_host.lower():
- namenode_id = nn_id
- namenode_rpc = nn_host
- # With HA enabled namenode_address is recomputed
- namenode_address = format('hdfs://{dfs_ha_nameservices}')
-
- # Calculate the namenode id of the other namenode. This is needed during RU to initiate an HA failover using ZKFC.
- if namenode_id is not None and len(dfs_ha_namemodes_ids_list) == 2:
- other_namenode_id = list(set(dfs_ha_namemodes_ids_list) - set([namenode_id]))[0]
+for ns, dfs_ha_namenode_ids in dfs_ha_namenode_ids_all_ns.iteritems():
+ found = False
+ if not is_empty(dfs_ha_namenode_ids):
+ dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
+ dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
+ if dfs_ha_namenode_ids_array_len > 1:
+ dfs_ha_enabled = True
+ if dfs_ha_enabled:
+ for nn_id in dfs_ha_namemodes_ids_list:
+ nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
+ if hostname.lower() in nn_host.lower() or public_hostname.lower() in nn_host.lower():
+ namenode_id = nn_id
+ namenode_rpc = nn_host
+ found = True
+ # With HA enabled namenode_address is recomputed
+ namenode_address = format('hdfs://{dfs_ha_nameservices}')
+
+ # Calculate the namenode id of the other namenode. This is needed during RU to initiate an HA failover using ZKFC.
+ if namenode_id is not None and len(dfs_ha_namemodes_ids_list) == 2:
+ other_namenode_id = list(set(dfs_ha_namemodes_ids_list) - set([namenode_id]))[0]
+
+ if found:
+ break
if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY":
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
index 677ccfa..342485d 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
@@ -59,7 +59,10 @@ class ZkfcSlave(Script):
import params
env.set_params(params)
+ utils.set_up_zkfc_security(params)
+
Execute("hdfs zkfc -formatZK -nonInteractive",
+ returns=[0, 2], # Returns 0 on success ; Returns 2 if zkfc is already formatted
user=params.hdfs_user,
logoutput=True
)
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
index 2982f8a..303ad3a 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
@@ -1377,7 +1377,7 @@ class TestNamenode(RMFTestCase):
config_dict = json_content,
stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES,
- call_mocks = [(0, None, ''), (0, None)],
+ call_mocks = [(0, None),(0, None, ''), (0, None)],
mocks_dict=mocks_dict)
self.assertTrue(setup_ranger_plugin_mock.called)
--
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.