You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/06/28 00:24:34 UTC
[36/51] [partial] ambari git commit: AMBARI-21349. Create BigInsights
Stack Skeleton in Ambari 2.5 (alejandro)
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py
new file mode 100755
index 0000000..46bd926
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py
@@ -0,0 +1,319 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+import os
+import json
+import tempfile
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_XML
+from resource_management.libraries.functions.version import compare_versions, \
+ format_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions import get_klist_path
+
+import namenode_upgrade
+from hdfs_namenode import namenode, wait_for_safemode_off
+from hdfs import hdfs
+import hdfs_rebalance
+from utils import failover_namenode, get_dfsadmin_base_command
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
+
+# hashlib is supplied as of Python 2.5 as the replacement interface for md5
+# and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if
+# available, avoiding a deprecation warning under 2.6. Import md5 otherwise,
+# preserving 2.4 compatibility.
+try:
+ import hashlib
+ _md5 = hashlib.md5
+except ImportError:
+ import md5
+ _md5 = md5.new
+
+class NameNode(Script):
+
+ def get_component_name(self):
+ return "hadoop-hdfs-namenode"
+
+ def install(self, env):
+ import params
+
+ self.install_packages(env)
+ env.set_params(params)
+ #TODO we need this for HA because of manual steps
+ self.configure(env)
+
+ def prepare_rolling_upgrade(self, env):
+ namenode_upgrade.prepare_rolling_upgrade()
+
+ def finalize_rolling_upgrade(self, env):
+ namenode_upgrade.finalize_rolling_upgrade()
+
+ def wait_for_safemode_off(self, env):
+ wait_for_safemode_off(30, True)
+
+ def finalize_non_rolling_upgrade(self, env):
+ namenode_upgrade.finalize_upgrade("nonrolling")
+
+ def finalize_rolling_upgrade(self, env):
+ namenode_upgrade.finalize_upgrade("rolling")
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing Stack Upgrade pre-restart")
+ import params
+ env.set_params(params)
+
+ if params.version and compare_versions(format_stack_version(params.version), '4.0.0.0') >= 0:
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-hdfs-namenode", params.version)
+ #Execute(format("stack-select set hadoop-hdfs-namenode {version}"))
+
+ def start(self, env, upgrade_type=None):
+ import params
+
+ env.set_params(params)
+ self.configure(env)
+ namenode(action="start", upgrade_type=upgrade_type, env=env)
+
+ def post_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing Stack Upgrade post-restart")
+ import params
+ env.set_params(params)
+
+ dfsadmin_base_command = get_dfsadmin_base_command('hdfs')
+ dfsadmin_cmd = dfsadmin_base_command + " -report -live"
+ Execute(dfsadmin_cmd,
+ user=params.hdfs_user,
+ tries=60,
+ try_sleep=10
+ )
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ if upgrade_type == "rolling" and params.dfs_ha_enabled:
+ if params.dfs_ha_automatic_failover_enabled:
+ failover_namenode()
+ else:
+ raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
+
+ namenode(action="stop", upgrade_type=upgrade_type, env=env)
+
+ def configure(self, env):
+ import params
+
+ env.set_params(params)
+ hdfs()
+ namenode(action="configure", env=env)
+ pass
+
+ def status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ check_process_status(status_params.namenode_pid_file)
+ pass
+
+ def security_status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+ props_value_check = None
+ props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal',
+ 'dfs.namenode.keytab.file',
+ 'dfs.namenode.kerberos.principal']
+ props_read_check = ['dfs.namenode.keytab.file']
+ hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ hdfs_expectations = {}
+ hdfs_expectations.update(core_site_expectations)
+ hdfs_expectations.update(hdfs_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML,
+ 'hdfs-site.xml': FILE_TYPE_XML})
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ try:
+ # Double check the dict before calling execute
+ if ( 'hdfs-site' not in security_params
+ or 'dfs.namenode.keytab.file' not in security_params['hdfs-site']
+ or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']):
+ self.put_structured_out({"securityState": "UNSECURED"})
+ self.put_structured_out(
+ {"securityIssuesFound": "Keytab file or principal are not set property."})
+ return
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ security_params['hdfs-site']['dfs.namenode.keytab.file'],
+ security_params['hdfs-site']['dfs.namenode.kerberos.principal'],
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+
+ def decommission(self, env):
+ import params
+
+ env.set_params(params)
+ namenode(action="decommission")
+ pass
+
+
+ def rebalancehdfs(self, env):
+ import params
+ env.set_params(params)
+
+ name_node_parameters = json.loads( params.name_node_params )
+ threshold = name_node_parameters['threshold']
+ _print("Starting balancer with threshold = %s\n" % threshold)
+
+ rebalance_env = {'PATH': params.hadoop_bin_dir}
+
+ if params.security_enabled:
+ # Create the kerberos credentials cache (ccache) file and set it in the environment to use
+ # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file
+ # to generate a (relatively) unique cache filename so that we can use it as needed.
+ # TODO: params.tmp_dir=/var/lib/ambari-agent/data/tmp. However hdfs user doesn't have access to this path.
+ # TODO: Hence using /tmp
+ ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest()
+ ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name)
+ rebalance_env['KRB5CCNAME'] = ccache_file_path
+
+ # If there are no tickets in the cache or they are expired, perform a kinit, else use what
+ # is in the cache
+ klist_cmd = format("{klist_path_local} -s {ccache_file_path}")
+ kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}")
+ if os.system(klist_cmd) != 0:
+ Execute(kinit_cmd, user=params.hdfs_user)
+
+ def calculateCompletePercent(first, current):
+ return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
+
+
+ def startRebalancingProcess(threshold, rebalance_env):
+ rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}')
+ return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env)
+
+ command = startRebalancingProcess(threshold, rebalance_env)
+
+ basedir = os.path.join(env.config.basedir, 'scripts')
+ if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD
+ basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
+ command = ['python','hdfs-command.py']
+
+ _print("Executing command %s\n" % command)
+
+ parser = hdfs_rebalance.HdfsParser()
+
+ def handle_new_line(line, is_stderr):
+ if is_stderr:
+ return
+
+ _print('[balancer] %s' % (line))
+ pl = parser.parseLine(line)
+ if pl:
+ res = pl.toJson()
+ res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
+
+ self.put_structured_out(res)
+ elif parser.state == 'PROCESS_FINISHED' :
+ _print('[balancer] %s' % ('Process is finished' ))
+ self.put_structured_out({'completePercent' : 1})
+ return
+
+ Execute(command,
+ on_new_line = handle_new_line,
+ logoutput = False,
+ )
+
+ if params.security_enabled and os.path.exists(ccache_file_path):
+ # Delete the kerberos credentials cache (ccache) file
+ os.remove(ccache_file_path)
+
+ def prepare_express_upgrade(self, env):
+ """
+ During an Express Upgrade.
+ If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
+ make sure that there is no "/previous" directory.
+
+ Create a list of all the DataNodes in the cluster.
+ hdfs dfsadmin -report > dfs-old-report-1.log
+
+ hdfs dfsadmin -safemode enter
+ hdfs dfsadmin -saveNamespace
+
+ Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
+
+ Finalize any prior HDFS upgrade,
+ hdfs dfsadmin -finalizeUpgrade
+
+ Prepare for a NameNode rolling upgrade in order to not lose any data.
+ hdfs dfsadmin -rollingUpgrade prepare
+ """
+ import params
+ Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
+
+ if params.security_enabled:
+ kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
+ Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+ namenode_upgrade.prepare_upgrade_check_for_previous_dir()
+ namenode_upgrade.prepare_upgrade_enter_safe_mode()
+ namenode_upgrade.prepare_upgrade_save_namespace()
+ namenode_upgrade.prepare_upgrade_backup_namenode_dir()
+ namenode_upgrade.prepare_upgrade_finalize_previous_upgrades()
+
+ # Call -rollingUpgrade prepare
+ namenode_upgrade.prepare_rolling_upgrade()
+
+def _print(line):
+ sys.stdout.write(line)
+ sys.stdout.flush()
+
+if __name__ == "__main__":
+ NameNode().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py
new file mode 100755
index 0000000..a5893b6
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py
@@ -0,0 +1,205 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.default import default
+from utils import get_value_from_jmx
+
+
+class NAMENODE_STATE:
+ ACTIVE = "active"
+ STANDBY = "standby"
+ UNKNOWN = "unknown"
+
+
+class NamenodeHAState:
+ """
+ Represents the current state of the Namenode Hosts in High Availability Mode
+ """
+
+ def __init__(self):
+ """
+ Initializes all fields by querying the Namenode state.
+ Raises a ValueError if unable to construct the object.
+ """
+ import params
+
+ self.name_service = default("/configurations/hdfs-site/dfs.nameservices", None)
+ if not self.name_service:
+ raise ValueError("Could not retrieve property dfs.nameservices")
+
+ nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service)
+ # List of the nn unique ids
+ self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None)
+ if not self.nn_unique_ids:
+ raise ValueError("Could not retrieve property " + nn_unique_ids_key)
+
+ self.nn_unique_ids = self.nn_unique_ids.split(",")
+ self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids]
+
+ policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY")
+ self.encrypted = policy.upper() == "HTTPS_ONLY"
+
+ jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
+ namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}"
+ namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}"
+
+ # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames
+ self.namenode_state_to_hostnames = {}
+
+ # Dictionary from nn unique id name to a tuple of (http address, https address)
+ self.nn_unique_id_to_addresses = {}
+ for nn_unique_id in self.nn_unique_ids:
+ http_key = namenode_http_fragment.format(self.name_service, nn_unique_id)
+ https_key = namenode_https_fragment.format(self.name_service, nn_unique_id)
+
+ http_value = default("/configurations/hdfs-site/" + http_key, None)
+ https_value = default("/configurations/hdfs-site/" + https_key, None)
+ actual_value = https_value if self.encrypted else http_value
+ hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None
+
+ self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value)
+ try:
+ if not hostname:
+ raise Exception("Could not retrieve hostname from address " + actual_value)
+
+ jmx_uri = jmx_uri_fragment.format(actual_value)
+ state = get_value_from_jmx(jmx_uri, "State")
+
+ if not state:
+ raise Exception("Could not retrieve Namenode state from URL " + jmx_uri)
+
+ state = state.lower()
+
+ if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]:
+ state = NAMENODE_STATE.UNKNOWN
+
+ if state in self.namenode_state_to_hostnames:
+ self.namenode_state_to_hostnames[state].add(hostname)
+ else:
+ hostnames = set([hostname, ])
+ self.namenode_state_to_hostnames[state] = hostnames
+ except:
+ Logger.error("Could not get namenode state for " + nn_unique_id)
+
+ def __str__(self):
+ return "Namenode HA State: {\n" + \
+ ("IDs: %s\n" % ", ".join(self.nn_unique_ids)) + \
+ ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \
+ ("States: %s\n" % str(self.namenode_state_to_hostnames)) + \
+ ("Encrypted: %s\n" % str(self.encrypted)) + \
+ ("Healthy: %s\n" % str(self.is_healthy())) + \
+ "}"
+
+ def is_encrypted(self):
+ """
+ :return: Returns a bool indicating if HTTPS is enabled
+ """
+ return self.encrypted
+
+ def get_nn_unique_ids(self):
+ """
+ :return Returns a list of the nn unique ids
+ """
+ return self.nn_unique_ids
+
+ def get_nn_unique_id_to_addresses(self):
+ """
+ :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address)
+ Each address is of the form, hostname:port
+ """
+ return self.nn_unique_id_to_addresses
+
+ def get_address_for_nn_id(self, id):
+ """
+ :param id: Namenode ID
+ :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id.
+ """
+ if id in self.nn_unique_id_to_addresses:
+ addresses = self.nn_unique_id_to_addresses[id]
+ if addresses and len(addresses) == 2:
+ return addresses[1] if self.encrypted else addresses[0]
+ return None
+
+ def get_address_for_host(self, hostname):
+ """
+ :param hostname: Host name
+ :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host.
+ """
+ for id, addresses in self.nn_unique_id_to_addresses.iteritems():
+ if addresses and len(addresses) == 2:
+ if ":" in addresses[0]:
+ nn_hostname = addresses[0].split(":")[0].strip()
+ if nn_hostname == hostname:
+ # Found the host
+ return addresses[1] if self.encrypted else addresses[0]
+ return None
+
+ def get_namenode_state_to_hostnames(self):
+ """
+ :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames.
+ """
+ return self.namenode_state_to_hostnames
+
+ def get_address(self, namenode_state):
+ """
+ @param namenode_state: Member of NAMENODE_STATE
+ :return Get the address that corresponds to the first host with the given state
+ """
+ hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else []
+ if hosts and len(hosts) > 0:
+ hostname = list(hosts)[0]
+ return self.get_address_for_host(hostname)
+ return None
+
+ def is_active(self, host_name):
+ """
+ :param host_name: Host name
+ :return: Return True if this is the active NameNode, otherwise, False.
+ """
+ return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE)
+
+ def is_standby(self, host_name):
+ """
+ :param host_name: Host name
+ :return: Return True if this is the standby NameNode, otherwise, False.
+ """
+ return self._is_in_state(host_name, NAMENODE_STATE.STANDBY)
+
+ def _is_in_state(self, host_name, state):
+ """
+ :param host_name: Host name
+ :param state: State to check
+ :return: Return True if this NameNode is in the specified state, otherwise, False.
+ """
+ mapping = self.get_namenode_state_to_hostnames()
+ if state in mapping:
+ hosts_in_state = mapping[state]
+ if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower():
+ return True
+ return False
+
+ def is_healthy(self):
+ """
+ :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist.
+ """
+ active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else []
+ standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else []
+ return len(active_hosts) == 1 and len(standby_hosts) == 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py
new file mode 100755
index 0000000..5969fcf
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py
@@ -0,0 +1,262 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+import os
+
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.default import default
+from resource_management.core.shell import call
+from resource_management.core.shell import as_user
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions import Direction, SafeMode
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.core.exceptions import Fail
+from utils import get_dfsadmin_base_command
+from namenode_ha_state import NamenodeHAState
+
+safemode_to_instruction = {SafeMode.ON: "enter",
+ SafeMode.OFF: "leave"}
+
+def prepare_upgrade_check_for_previous_dir():
+ """
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data.
+ Check that there is no "previous" folder inside the NameNode Name Dir.
+ """
+ import params
+
+ if params.dfs_ha_enabled:
+ namenode_ha = NamenodeHAState()
+ if namenode_ha.is_active(params.hostname):
+ Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
+
+ problematic_previous_namenode_dirs = set()
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ for nn_dir in nn_name_dirs:
+ if os.path.isdir(nn_dir):
+ # Check for a previous folder, which is not allowed.
+ previous_dir = os.path.join(nn_dir, "previous")
+ if os.path.isdir(previous_dir):
+ problematic_previous_namenode_dirs.add(previous_dir)
+
+ if len(problematic_previous_namenode_dirs) > 0:
+ message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \
+ 'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \
+ 'NameNode Name Dir(s): {0}\n' \
+ '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs))
+ Logger.error(message)
+ raise Fail(message)
+
+def prepare_upgrade_enter_safe_mode():
+ """
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode.
+ """
+ import params
+
+ dfsadmin_base_command = get_dfsadmin_base_command('hdfs')
+ safe_mode_enter_cmd = dfsadmin_base_command + " -safemode enter"
+ try:
+ # Safe to call if already in Safe Mode
+ desired_state = SafeMode.ON
+ safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, params.dfs_ha_enabled)
+ Logger.info("Transition successful: {0}, original state: {1}".format(str(safemode_transition_successful), str(original_state)))
+ if not safemode_transition_successful:
+ raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state))
+ except Exception, e:
+ message = "Could not enter safemode. Error: {0}. As the HDFS user, call this command: {1}".format(str(e), safe_mode_enter_cmd)
+ Logger.error(message)
+ raise Fail(message)
+
+def prepare_upgrade_save_namespace():
+ """
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace.
+ """
+ import params
+
+ dfsadmin_base_command = get_dfsadmin_base_command('hdfs')
+ save_namespace_cmd = dfsadmin_base_command + " -saveNamespace"
+ try:
+ Logger.info("Checkpoint the current namespace.")
+ as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+ except Exception, e:
+ message = format("Could not save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}")
+ Logger.error(message)
+ raise Fail(message)
+
+def prepare_upgrade_backup_namenode_dir():
+ """
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs.
+ """
+ import params
+
+ i = 0
+ failed_paths = []
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted)
+ if len(nn_name_dirs) > 0:
+ Logger.info("Backup the NameNode name directory's CURRENT folder.")
+ for nn_dir in nn_name_dirs:
+ i += 1
+ namenode_current_image = os.path.join(nn_dir, "current")
+ unique = get_unique_id_and_date() + "_" + str(i)
+ # Note that /tmp may not be writeable.
+ backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique)
+
+ if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder):
+ try:
+ os.makedirs(backup_current_folder)
+ Execute(('cp', '-ar', namenode_current_image, backup_current_folder),
+ sudo=True
+ )
+ except Exception, e:
+ failed_paths.append(namenode_current_image)
+ if len(failed_paths) > 0:
+ Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is "
+ "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir,
+ ", ".join(failed_paths)))
+
+def prepare_upgrade_finalize_previous_upgrades():
+ """
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress.
+ """
+ import params
+
+ dfsadmin_base_command = get_dfsadmin_base_command('hdfs')
+ finalize_command = dfsadmin_base_command + " -rollingUpgrade finalize"
+ try:
+ Logger.info("Attempt to Finalize if there are any in-progress upgrades. "
+ "This will return 255 if no upgrades are in progress.")
+ code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user)
+ if out:
+ expected_substring = "there is no rolling upgrade in progress"
+ if expected_substring not in out.lower():
+ Logger.warning('Finalize command did not contain substring: %s' % expected_substring)
+ else:
+ Logger.warning("Finalize command did not return any output.")
+ except Exception, e:
+ Logger.warning("Ensure no upgrades are in progress.")
+
+
+def reach_safemode_state(user, safemode_state, in_ha):
+ """
+ Enter or leave safemode for the Namenode.
+ @param user: user to perform action as
+ @param safemode_state: Desired state of ON or OFF
+ @param in_ha: bool indicating if Namenode High Availability is enabled
+ @:return Returns a tuple of (transition success, original state). If no change is needed, the indicator of
+ success will be True
+ """
+ Logger.info("Prepare to transition into safemode state %s" % safemode_state)
+ import params
+ original_state = SafeMode.UNKNOWN
+
+ hostname = params.hostname
+ safemode_check = format("su - {user} -c 'hdfs dfsadmin -safemode get'")
+
+ grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
+ safemode_check_with_grep = format("su - {user} -c 'hdfs dfsadmin -safemode get | grep \"{grep_pattern}\"'")
+ code, out = call(safemode_check)
+ Logger.info("Command: %s\nCode: %d." % (safemode_check, code))
+ if code == 0 and out is not None:
+ Logger.info(out)
+ re_pattern = r"Safe mode is (\S*) in " + hostname.replace(".", "\\.") if in_ha else r"Safe mode is (\S*)"
+ m = re.search(re_pattern, out, re.IGNORECASE)
+ if m and len(m.groups()) >= 1:
+ original_state = m.group(1).upper()
+
+ if original_state == safemode_state:
+ return (True, original_state)
+ else:
+ # Make a transition
+ command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
+ Execute(command,
+ user=user,
+ logoutput=True,
+ path=[params.hadoop_bin_dir])
+
+ code, out = call(safemode_check_with_grep)
+ Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check_with_grep, code, out))
+ if code == 0:
+ return (True, original_state)
+ return (False, original_state)
+
+
+def prepare_rolling_upgrade():
+ """
+ Perform either an upgrade or a downgrade.
+
+ Rolling Upgrade for HDFS Namenode requires the following.
+ 0. Namenode must be up
+ 1. Leave safemode if the safemode status is not OFF
+ 2. Execute a rolling upgrade "prepare"
+ 3. Execute a rolling upgrade "query"
+ """
+ import params
+
+ if not params.upgrade_direction or params.upgrade_direction not in [Direction.UPGRADE, Direction.DOWNGRADE]:
+ raise Fail("Could not retrieve upgrade direction: %s" % str(params.upgrade_direction))
+ Logger.info(format("Performing a(n) {params.upgrade_direction} of HDFS"))
+
+ if params.security_enabled:
+ Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}"))
+
+
+ if params.upgrade_direction == Direction.UPGRADE:
+ if params.dfs_ha_enabled:
+ Logger.info('High Availability is enabled, must leave safemode before calling "-rollingUpgrade prepare"')
+ desired_state = SafeMode.OFF
+ safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True)
+ if not safemode_transition_successful:
+ raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF))
+
+ prepare = "hdfs dfsadmin -rollingUpgrade prepare"
+ query = "hdfs dfsadmin -rollingUpgrade query"
+ Execute(prepare,
+ user=params.hdfs_user,
+ logoutput=True)
+ Execute(query,
+ user=params.hdfs_user,
+ logoutput=True)
+ elif params.upgrade_direction == Direction.DOWNGRADE:
+ pass
+
+def finalize_upgrade(upgrade_type):
+ """
+ Finalize the Namenode upgrade, at which point it cannot be downgraded.
+ :param upgrade_type rolling or nonrolling
+ """
+ Logger.info("Executing Rolling Upgrade finalize")
+ import params
+
+ if params.security_enabled:
+ Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}"))
+
+ finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
+ query_cmd = "hdfs dfsadmin -rollingUpgrade query"
+
+ Execute(query_cmd,
+ user=params.hdfs_user,
+ logoutput=True)
+ Execute(finalize_cmd,
+ user=params.hdfs_user,
+ logoutput=True)
+ Execute(query_cmd,
+ user=params.hdfs_user,
+ logoutput=True)
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py
new file mode 100755
index 0000000..ff4778a
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py
@@ -0,0 +1,138 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_XML
+from hdfs_nfsgateway import nfsgateway
+from hdfs import hdfs
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.version import compare_versions, format_stack_version
+
+
+class NFSGateway(Script):
+
+ def get_component_name(self):
+ return "hadoop-hdfs-nfs3"
+
+ def install(self, env):
+ import params
+
+ env.set_params(params)
+
+ self.install_packages(env)
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ if Script.is_stack_greater_or_equal('4.1.0.0'):
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-hdfs-nfs3", params.version)
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ self.configure(env)
+ nfsgateway(action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ nfsgateway(action="stop")
+
+ def configure(self, env):
+ import params
+
+ env.set_params(params)
+ hdfs()
+ nfsgateway(action="configure")
+
+ def status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+
+ check_process_status(status_params.nfsgateway_pid_file)
+
+ def security_status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+ props_value_check = None
+ props_empty_check = ['nfs.keytab.file',
+ 'nfs.kerberos.principal']
+ props_read_check = ['nfs.keytab.file']
+ hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ hdfs_expectations = {}
+ hdfs_expectations.update(core_site_expectations)
+ hdfs_expectations.update(hdfs_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML,
+ 'hdfs-site.xml': FILE_TYPE_XML})
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ try:
+ # Double check the dict before calling execute
+ if ('hdfs-site' not in security_params or
+ 'nfs.keytab.file' not in security_params['hdfs-site'] or
+ 'nfs.kerberos.principal' not in security_params['hdfs-site']):
+ self.put_structured_out({"securityState": "UNSECURED"})
+ self.put_structured_out(
+ {"securityIssuesFound": "Keytab file or principal are not set property."})
+ return
+
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ security_params['hdfs-site']['nfs.keytab.file'],
+ security_params['hdfs-site'][
+ 'nfs.kerberos.principal'],
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+if __name__ == "__main__":
+ NFSGateway().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py
new file mode 100755
index 0000000..1f76f80
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py
@@ -0,0 +1,326 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.functions import conf_select, stack_select
+from resource_management.libraries.functions.version import format_stack_version, compare_versions
+from resource_management.libraries.functions.default import default
+from resource_management import *
+import status_params
+import utils
+import os
+import itertools
+import re
+
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+
+stack_name = default("/hostLevelParams/stack_name", None)
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+
+stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
+stack_version = format_stack_version(stack_version_unformatted)
+
+# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
+version = default("/commandParams/version", None)
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+hdfs_user = status_params.hdfs_user
+root_user = "root"
+hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix
+
+# Some datanode settings
+dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None)
+dfs_dn_http_addr = default('/configurations/hdfs-site/dfs.datanode.http.address', None)
+dfs_dn_https_addr = default('/configurations/hdfs-site/dfs.datanode.https.address', None)
+dfs_http_policy = default('/configurations/hdfs-site/dfs.http.policy', None)
+dfs_dn_ipc_address = config['configurations']['hdfs-site']['dfs.datanode.ipc.address']
+secure_dn_ports_are_in_use = False
+
+#hadoop params
+mapreduce_libs_path = "/usr/iop/current/hadoop-mapreduce-client/*"
+hadoop_libexec_dir = stack_select.get_hadoop_dir("libexec")
+hadoop_bin = stack_select.get_hadoop_dir("sbin")
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
+hadoop_home = "/usr/iop/current/hadoop-client"
+if not security_enabled:
+ hadoop_secure_dn_user = '""'
+else:
+ dfs_dn_port = utils.get_port(dfs_dn_addr)
+ dfs_dn_http_port = utils.get_port(dfs_dn_http_addr)
+ dfs_dn_https_port = utils.get_port(dfs_dn_https_addr)
+ # We try to avoid inability to start datanode as a plain user due to usage of root-owned ports
+ if dfs_http_policy == "HTTPS_ONLY":
+ secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_https_port)
+ elif dfs_http_policy == "HTTP_AND_HTTPS":
+ secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) or utils.is_secure_port(dfs_dn_https_port)
+ else: # params.dfs_http_policy == "HTTP_ONLY" or not defined:
+ secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port)
+ if secure_dn_ports_are_in_use:
+ hadoop_secure_dn_user = hdfs_user
+ else:
+ hadoop_secure_dn_user = '""'
+
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
+hadoop_conf_empty_dir = "/etc/hadoop/conf.empty"
+limits_conf_dir = "/etc/security/limits.d"
+hadoop_lib_home = stack_select.get_hadoop_dir("lib")
+ambari_libs_dir = "/var/lib/ambari-agent/lib"
+
+#snappy
+create_lib_snappy_symlinks = False
+snappy_so = "libsnappy.so"
+so_target_dir_x86 = format("{hadoop_lib_home}/native/Linux-i386-32")
+so_target_dir_x64 = format("{hadoop_lib_home}/native/Linux-amd64-64")
+so_target_x86 = format("{so_target_dir_x86}/{snappy_so}")
+so_target_x64 = format("{so_target_dir_x64}/{snappy_so}")
+so_src_dir_x86 = format("{hadoop_home}/lib")
+so_src_dir_x64 = format("{hadoop_home}/lib/native")
+so_src_x86 = format("{so_src_dir_x86}/{snappy_so}")
+so_src_x64 = format("{so_src_dir_x64}/{snappy_so}")
+
+execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir
+ulimit_cmd = "ulimit -c unlimited ; "
+
+#security params
+smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+falcon_user = config['configurations']['falcon-env']['falcon_user']
+
+#exclude file
+hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", [])
+exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude']
+update_exclude_file_only = default("/commandParams/update_exclude_file_only",False)
+
+klist_path_local = functions.get_klist_path()
+kinit_path_local = functions.get_kinit_path()
+#hosts
+hostname = config["hostname"]
+rm_host = default("/clusterHostInfo/rm_host", [])
+slave_hosts = default("/clusterHostInfo/slave_hosts", [])
+oozie_servers = default("/clusterHostInfo/oozie_server", [])
+hcat_server_hosts = default("/clusterHostInfo/webhcat_server_host", [])
+hive_server_host = default("/clusterHostInfo/hive_server_host", [])
+hbase_master_hosts = default("/clusterHostInfo/hbase_master_hosts", [])
+hs_host = default("/clusterHostInfo/hs_host", [])
+jtnode_host = default("/clusterHostInfo/jtnode_host", [])
+namenode_host = default("/clusterHostInfo/namenode_host", [])
+nm_host = default("/clusterHostInfo/nm_host", [])
+ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", [])
+journalnode_hosts = default("/clusterHostInfo/journalnode_hosts", [])
+zkfc_hosts = default("/clusterHostInfo/zkfc_hosts", [])
+falcon_host = default("/clusterHostInfo/falcon_server_hosts", [])
+
+has_ganglia_server = not len(ganglia_server_hosts) == 0
+has_namenodes = not len(namenode_host) == 0
+has_jobtracker = not len(jtnode_host) == 0
+has_resourcemanager = not len(rm_host) == 0
+has_histroryserver = not len(hs_host) == 0
+has_hbase_masters = not len(hbase_master_hosts) == 0
+has_slaves = not len(slave_hosts) == 0
+has_oozie_server = not len(oozie_servers) == 0
+has_hcat_server_host = not len(hcat_server_hosts) == 0
+has_hive_server_host = not len(hive_server_host) == 0
+has_journalnode_hosts = not len(journalnode_hosts) == 0
+has_zkfc_hosts = not len(zkfc_hosts) == 0
+has_falcon_host = not len(falcon_host) == 0
+
+
+is_namenode_master = hostname in namenode_host
+is_jtnode_master = hostname in jtnode_host
+is_rmnode_master = hostname in rm_host
+is_hsnode_master = hostname in hs_host
+is_hbase_master = hostname in hbase_master_hosts
+is_slave = hostname in slave_hosts
+
+if has_ganglia_server:
+ ganglia_server_host = ganglia_server_hosts[0]
+
+#users and groups
+yarn_user = config['configurations']['yarn-env']['yarn_user']
+hbase_user = config['configurations']['hbase-env']['hbase_user']
+oozie_user = config['configurations']['oozie-env']['oozie_user']
+webhcat_user = config['configurations']['hive-env']['hcat_user']
+hcat_user = config['configurations']['hive-env']['hcat_user']
+hive_user = config['configurations']['hive-env']['hive_user']
+smoke_user = config['configurations']['cluster-env']['smokeuser']
+smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name']
+mapred_user = config['configurations']['mapred-env']['mapred_user']
+hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
+
+user_group = config['configurations']['cluster-env']['user_group']
+root_group = "root"
+proxyuser_group = config['configurations']['hadoop-env']['proxyuser_group']
+
+#hadoop params
+hdfs_log_dir_prefix = config['configurations']['hadoop-env']['hdfs_log_dir_prefix']
+hadoop_root_logger = config['configurations']['hadoop-env']['hadoop_root_logger']
+
+dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path']
+dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path)
+
+jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir']
+
+dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir']
+
+namenode_dirs_created_stub_dir = format("{hdfs_log_dir_prefix}/{hdfs_user}")
+namenode_dirs_stub_filename = "namenode_dirs_created"
+
+smoke_hdfs_user_dir = format("/user/{smoke_user}")
+smoke_hdfs_user_mode = 0770
+
+
+hdfs_namenode_formatted_mark_suffix = "/namenode-formatted/"
+namenode_formatted_old_mark_dirs = ["/var/run/hadoop/hdfs/namenode-formatted",
+ format("{hadoop_pid_dir_prefix}/hdfs/namenode/formatted"),
+ "/var/lib/hdfs/namenode/formatted"]
+dfs_name_dirs = dfs_name_dir.split(",")
+namenode_formatted_mark_dirs = []
+for dn_dir in dfs_name_dirs:
+ tmp_mark_dir = format("{dn_dir}{hdfs_namenode_formatted_mark_suffix}")
+ namenode_formatted_mark_dirs.append(tmp_mark_dir)
+
+# Use the namenode RPC address if configured, otherwise, fallback to the default file system
+namenode_address = None
+if 'dfs.namenode.rpc-address' in config['configurations']['hdfs-site']:
+ namenode_rpcaddress = config['configurations']['hdfs-site']['dfs.namenode.rpc-address']
+ namenode_address = format("hdfs://{namenode_rpcaddress}")
+else:
+ namenode_address = config['configurations']['core-site']['fs.defaultFS']
+
+fs_checkpoint_dirs = default("/configurations/hdfs-site/dfs.namenode.checkpoint.dir", "").split(',')
+
+dfs_data_dirs = config['configurations']['hdfs-site']['dfs.datanode.data.dir']
+
+data_dir_mount_file = config['configurations']['hadoop-env']['dfs.datanode.data.dir.mount.file']
+
+# HDFS High Availability properties
+dfs_ha_enabled = False
+dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None)
+dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
+dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False)
+
+# hostname of the active HDFS HA Namenode (only used when HA is enabled)
+dfs_ha_namenode_active = default("/configurations/hadoop-env/dfs_ha_initial_namenode_active", None)
+# hostname of the standby HDFS HA Namenode (only used when HA is enabled)
+dfs_ha_namenode_standby = default("/configurations/hadoop-env/dfs_ha_initial_namenode_standby", None)
+
+namenode_id = None
+namenode_rpc = None
+
+dfs_ha_namemodes_ids_list = []
+other_namenode_id = None
+
+if dfs_ha_namenode_ids:
+ dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
+ dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
+ if dfs_ha_namenode_ids_array_len > 1:
+ dfs_ha_enabled = True
+if dfs_ha_enabled:
+ for nn_id in dfs_ha_namemodes_ids_list:
+ nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
+ if hostname in nn_host:
+ namenode_id = nn_id
+ namenode_rpc = nn_host
+
+if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY":
+ https_only = True
+ journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None)
+else:
+ https_only = False
+ journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.http-address', None)
+
+if journalnode_address:
+ journalnode_port = journalnode_address.split(":")[1]
+
+
+if security_enabled:
+ _dn_principal_name = config['configurations']['hdfs-site']['dfs.datanode.kerberos.principal']
+ _dn_keytab = config['configurations']['hdfs-site']['dfs.datanode.keytab.file']
+ _dn_principal_name = _dn_principal_name.replace('_HOST',hostname.lower())
+
+ dn_kinit_cmd = format("{kinit_path_local} -kt {_dn_keytab} {_dn_principal_name};")
+
+ _nn_principal_name = config['configurations']['hdfs-site']['dfs.namenode.kerberos.principal']
+ _nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file']
+ _nn_principal_name = _nn_principal_name.replace('_HOST',hostname.lower())
+
+ nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};")
+
+ _jn_principal_name = default("/configurations/hdfs-site/dfs.journalnode.kerberos.principal", None)
+ if _jn_principal_name:
+ _jn_principal_name = _jn_principal_name.replace('_HOST', hostname.lower())
+ _jn_keytab = default("/configurations/hdfs-site/dfs.journalnode.keytab.file", None)
+ jn_kinit_cmd = format("{kinit_path_local} -kt {_jn_keytab} {_jn_principal_name};")
+else:
+ dn_kinit_cmd = ""
+ nn_kinit_cmd = ""
+ jn_kinit_cmd = ""
+
+
+hdfs_site = config['configurations']['hdfs-site']
+default_fs = config['configurations']['core-site']['fs.defaultFS']
+
+import functools
+#create partial functions with common arguments for every HdfsDirectory call
+#to create hdfs directory we need to call params.HdfsDirectory in code
+HdfsResource = functools.partial(
+ HdfsResource,
+ user=hdfs_user,
+ security_enabled = security_enabled,
+ keytab = hdfs_user_keytab,
+ kinit_path_local = kinit_path_local,
+ hadoop_bin_dir = hadoop_bin_dir,
+ hadoop_conf_dir = hadoop_conf_dir,
+ principal_name = hdfs_principal_name,
+ hdfs_site = hdfs_site,
+ default_fs = default_fs
+)
+
+io_compression_codecs = config['configurations']['core-site']['io.compression.codecs']
+if not "com.hadoop.compression.lzo" in io_compression_codecs:
+ exclude_packages = ["lzo", "hadoop-lzo", "hadoop-lzo-native", "liblzo2-2"]
+else:
+ exclude_packages = []
+name_node_params = default("/commandParams/namenode", None)
+
+#hadoop params
+hadoop_env_sh_template = config['configurations']['hadoop-env']['content']
+
+#hadoop-env.sh
+java_home = config['hostLevelParams']['java_home']
+java_version = int(config['hostLevelParams']['java_version'])
+
+jsvc_path = "/usr/lib/bigtop-utils"
+
+hadoop_heapsize = config['configurations']['hadoop-env']['hadoop_heapsize']
+namenode_heapsize = config['configurations']['hadoop-env']['namenode_heapsize']
+namenode_opt_newsize = config['configurations']['hadoop-env']['namenode_opt_newsize']
+namenode_opt_maxnewsize = config['configurations']['hadoop-env']['namenode_opt_maxnewsize']
+namenode_opt_permsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_permsize","128m")
+namenode_opt_maxpermsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_maxpermsize","256m")
+
+jtnode_opt_newsize = "200m"
+jtnode_opt_maxnewsize = "200m"
+jtnode_heapsize = "1024m"
+ttnode_heapsize = "1024m"
+
+dtnode_heapsize = config['configurations']['hadoop-env']['dtnode_heapsize']
+mapred_pid_dir_prefix = default("/configurations/mapred-env/mapred_pid_dir_prefix","/var/run/hadoop-mapreduce")
+mapred_log_dir_prefix = default("/configurations/mapred-env/mapred_log_dir_prefix","/var/log/hadoop-mapreduce")
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py
new file mode 100755
index 0000000..ffbe658
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py
@@ -0,0 +1,119 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+
+
+class HdfsServiceCheck(Script):
+ def service_check(self, env):
+ import params
+
+ env.set_params(params)
+ unique = functions.get_unique_id_and_date()
+ dir = '/tmp'
+ tmp_file = format("{dir}/{unique}")
+
+ safemode_command = "dfsadmin -safemode get | grep OFF"
+
+ create_dir_cmd = format("fs -mkdir {dir}")
+ chmod_command = format("fs -chmod 777 {dir}")
+ test_dir_exists = as_user(format("{hadoop_bin_dir}/hadoop --config {hadoop_conf_dir} fs -test -e {dir}"), params.hdfs_user)
+ cleanup_cmd = format("fs -rm {tmp_file}")
+ #cleanup put below to handle retries; if retrying there wil be a stale file
+ #that needs cleanup; exit code is fn of second command
+ create_file_cmd = format(
+ "{cleanup_cmd}; hadoop --config {hadoop_conf_dir} fs -put /etc/passwd {tmp_file}")
+ test_cmd = format("fs -test -e {tmp_file}")
+ if params.security_enabled:
+ Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+ user=params.hdfs_user
+ )
+ ExecuteHadoop(safemode_command,
+ user=params.hdfs_user,
+ logoutput=True,
+ conf_dir=params.hadoop_conf_dir,
+ try_sleep=3,
+ tries=20,
+ bin_dir=params.hadoop_bin_dir
+ )
+ ExecuteHadoop(create_dir_cmd,
+ user=params.hdfs_user,
+ logoutput=True,
+ not_if=test_dir_exists,
+ conf_dir=params.hadoop_conf_dir,
+ try_sleep=3,
+ tries=5,
+ bin_dir=params.hadoop_bin_dir
+ )
+ ExecuteHadoop(chmod_command,
+ user=params.hdfs_user,
+ logoutput=True,
+ conf_dir=params.hadoop_conf_dir,
+ try_sleep=3,
+ tries=5,
+ bin_dir=params.hadoop_bin_dir
+ )
+ ExecuteHadoop(create_file_cmd,
+ user=params.hdfs_user,
+ logoutput=True,
+ conf_dir=params.hadoop_conf_dir,
+ try_sleep=3,
+ tries=5,
+ bin_dir=params.hadoop_bin_dir
+ )
+ ExecuteHadoop(test_cmd,
+ user=params.hdfs_user,
+ logoutput=True,
+ conf_dir=params.hadoop_conf_dir,
+ try_sleep=3,
+ tries=5,
+ bin_dir=params.hadoop_bin_dir
+ )
+ if params.has_journalnode_hosts:
+ journalnode_port = params.journalnode_port
+ checkWebUIFileName = "checkWebUI.py"
+ checkWebUIFilePath = format("{tmp_dir}/{checkWebUIFileName}")
+ comma_sep_jn_hosts = ",".join(params.journalnode_hosts)
+ checkWebUICmd = format("python {checkWebUIFilePath} -m {comma_sep_jn_hosts} -p {journalnode_port} -s {https_only}")
+ File(checkWebUIFilePath,
+ content=StaticFile(checkWebUIFileName),
+ mode=0775)
+
+ Execute(checkWebUICmd,
+ logoutput=True,
+ try_sleep=3,
+ tries=5,
+ user=params.smoke_user
+ )
+
+ if params.is_namenode_master:
+ if params.has_zkfc_hosts:
+ pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
+ pid_file = format("{pid_dir}/hadoop-{hdfs_user}-zkfc.pid")
+ check_zkfc_process_cmd = format(
+ "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
+ Execute(check_zkfc_process_cmd,
+ logoutput=True,
+ try_sleep=3,
+ tries=5
+ )
+
+
+if __name__ == "__main__":
+ HdfsServiceCheck().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py
new file mode 100755
index 0000000..4224a9e
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py
@@ -0,0 +1,142 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.version import compare_versions, format_stack_version
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_XML
+from resource_management.core.logger import Logger
+
+from hdfs_snamenode import snamenode
+from hdfs import hdfs
+
+class SNameNode(Script):
+
+ def get_component_name(self):
+ return "hadoop-hdfs-secondarynamenode"
+
+ def install(self, env):
+ import params
+
+ env.set_params(params)
+
+ self.install_packages(env)
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing Stack Upgrade pre-restart")
+ import params
+ env.set_params(params)
+
+ if params.version and compare_versions(format_stack_version(params.version), '4.0.0.0') >= 0:
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-hdfs-secondarynamenode", params.version)
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ self.configure(env)
+ snamenode(action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ snamenode(action="stop")
+
+ def configure(self, env):
+ import params
+
+ env.set_params(params)
+ hdfs()
+ snamenode(action="configure")
+
+ def status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+
+ check_process_status(status_params.snamenode_pid_file)
+
+ def security_status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+ props_value_check = None
+ props_empty_check = ['dfs.secondary.namenode.kerberos.internal.spnego.principal',
+ 'dfs.secondary.namenode.keytab.file',
+ 'dfs.secondary.namenode.kerberos.principal']
+ props_read_check = ['dfs.secondary.namenode.keytab.file']
+ hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ hdfs_expectations = {}
+ hdfs_expectations.update(core_site_expectations)
+ hdfs_expectations.update(hdfs_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML,
+ 'hdfs-site.xml': FILE_TYPE_XML})
+
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ try:
+ # Double check the dict before calling execute
+ if ('hdfs-site' not in security_params or
+ 'dfs.secondary.namenode.keytab.file' not in security_params['hdfs-site'] or
+ 'dfs.secondary.namenode.kerberos.principal' not in security_params['hdfs-site']):
+ self.put_structured_out({"securityState": "UNSECURED"})
+ self.put_structured_out(
+ {"securityIssuesFound": "Keytab file or principal are not set property."})
+ return
+
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ security_params['hdfs-site']['dfs.secondary.namenode.keytab.file'],
+ security_params['hdfs-site'][
+ 'dfs.secondary.namenode.kerberos.principal'],
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+
+if __name__ == "__main__":
+ SNameNode().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py
new file mode 100755
index 0000000..f1abf08
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py
@@ -0,0 +1,42 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+
+config = Script.get_config()
+
+hadoop_pid_dir_prefix = config['configurations']['hadoop-env']['hadoop_pid_dir_prefix']
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
+hadoop_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
+datanode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-datanode.pid")
+namenode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-namenode.pid")
+snamenode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-secondarynamenode.pid")
+journalnode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-journalnode.pid")
+zkfc_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-zkfc.pid")
+nfsgateway_pid_file = format("{hadoop_pid_dir_prefix}/root/hadoop_privileged_nfs3.pid")
+
+# Security related/required params
+hostname = config['hostname']
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+hdfs_user_principal = config['configurations']['hadoop-env']['hdfs_principal_name']
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+kinit_path_local = functions.get_kinit_path()
+tmp_dir = Script.get_tmp_dir()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py
new file mode 100755
index 0000000..5cd1735
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py
@@ -0,0 +1,357 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import re
+import urllib2
+import json
+
+from resource_management import *
+from resource_management.libraries.functions.format import format
+from resource_management.core.shell import call, checked_call
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.libraries.functions.curl_krb_request import curl_krb_request
+
+from zkfc_slave import ZkfcSlave
+
+def safe_zkfc_op(action, env):
+ """
+ Idempotent operation on the zkfc process to either start or stop it.
+ :param action: start or stop
+ :param env: environment
+ """
+ zkfc = None
+ if action == "start":
+ try:
+ zkfc = ZkfcSlave()
+ zkfc.status(env)
+ except ComponentIsNotRunning:
+ if zkfc:
+ zkfc.start(env)
+
+ if action == "stop":
+ try:
+ zkfc = ZkfcSlave()
+ zkfc.status(env)
+ except ComponentIsNotRunning:
+ pass
+ else:
+ if zkfc:
+ zkfc.stop(env)
+
+
+def failover_namenode():
+ """
+ Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
+ """
+ import params
+ check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
+ code, out = call(check_service_cmd, logoutput=True, user=params.hdfs_user)
+
+ state = "unknown"
+ if code == 0 and out:
+ state = "active" if "active" in out else ("standby" if "standby" in out else state)
+ Logger.info("Namenode service state: %s" % state)
+
+ if state == "active":
+ Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
+
+ # Forcefully kill ZKFC on this host to initiate a failover
+ # If ZKFC is already dead, then potentially this node can still be the active one.
+ was_zkfc_killed = kill_zkfc(params.hdfs_user)
+
+ # Wait until it transitions to standby
+ check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
+
+ # process may already be down. try one time, then proceed
+ code, out = call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
+ Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
+
+ if code == 255 and out:
+ Logger.info("Rolling Upgrade - namenode is already down")
+ else:
+ if was_zkfc_killed:
+ # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
+ Execute(check_standby_cmd,
+ user=params.hdfs_user,
+ tries=50,
+ try_sleep=6,
+ logoutput=True)
+
+ else:
+ Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname))
+
+
+def kill_zkfc(zkfc_user):
+ """
+ There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
+ Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
+ Option 2. Silent failover (not supported as of IOP 4.0.0.0)
+ :param zkfc_user: User that started the ZKFC process.
+ :return: Return True if ZKFC was killed, otherwise, false.
+ """
+ import params
+ if params.dfs_ha_enabled:
+ zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
+ if zkfc_pid_file:
+ check_process = format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1")
+ code, out = call(check_process)
+ if code == 0:
+ Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
+ kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
+ Execute(kill_command)
+ Execute(format("rm -f {zkfc_pid_file}"))
+ return True
+ return False
+
+
+def get_service_pid_file(name, user):
+ """
+ Get the pid file path that was used to start the service by the user.
+ :param name: Service name
+ :param user: User that started the service.
+ :return: PID file path
+ """
+ import params
+ pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
+ pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
+ return pid_file
+
+
+def service(action=None, name=None, user=None, options="", create_pid_dir=False,
+ create_log_dir=False):
+ """
+ :param action: Either "start" or "stop"
+ :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
+ :param user: User to run the command as
+ :param options: Additional options to pass to command as a string
+ :param create_pid_dir: Create PID directory
+ :param create_log_dir: Crate log file directory
+ """
+ import params
+
+ options = options if options else ""
+ pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
+ pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
+ hadoop_env_exports = {
+ 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
+ }
+ log_dir = format("{hdfs_log_dir_prefix}/{user}")
+
+ # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
+ # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
+ if name == "nfs3" :
+ pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
+ custom_export = {
+ 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
+ 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
+ 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
+ }
+ hadoop_env_exports.update(custom_export)
+
+ check_process = format(
+ "ls {pid_file} >/dev/null 2>&1 &&"
+ " ps -p `cat {pid_file}` >/dev/null 2>&1")
+
+ # on STOP directories shouldn't be created
+ # since during stop still old dirs are used (which were created during previous start)
+ if action != "stop":
+ if name == "nfs3":
+ Directory(params.hadoop_pid_dir_prefix,
+ mode=0755,
+ owner=params.root_user,
+ group=params.root_group
+ )
+ else:
+ Directory(params.hadoop_pid_dir_prefix,
+ mode=0755,
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+ if create_pid_dir:
+ Directory(pid_dir,
+ owner=user,
+ create_parents=True)
+ if create_log_dir:
+ if name == "nfs3":
+ Directory(log_dir,
+ mode=0775,
+ owner=params.root_user,
+ group=params.user_group)
+ else:
+ Directory(log_dir,
+ owner=user,
+ create_parents=True)
+
+ if params.security_enabled and name == "datanode":
+ ## The directory where pid files are stored in the secure data environment.
+ hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
+ hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
+
+ if params.secure_dn_ports_are_in_use:
+ user = "root"
+ pid_file = format(
+ "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
+
+ if action == 'stop' and os.path.isfile(hadoop_secure_dn_pid_file):
+ # We need special handling for this case to handle the situation
+ # when we configure non-root secure DN and then restart it
+ # to handle new configs. Otherwise we will not be able to stop
+ # a running instance
+ user = "root"
+
+ try:
+ check_process_status(hadoop_secure_dn_pid_file)
+
+ custom_export = {
+ 'HADOOP_SECURE_DN_USER': params.hdfs_user
+ }
+ hadoop_env_exports.update(custom_export)
+
+ except ComponentIsNotRunning:
+ pass
+
+ hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
+
+ if user == "root":
+ cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
+ if options:
+ cmd += [options, ]
+ daemon_cmd = as_sudo(cmd)
+ else:
+ cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
+ if options:
+ cmd += " " + options
+ daemon_cmd = as_user(cmd, user)
+
+ service_is_up = check_process if action == "start" else None
+ #remove pid file from dead process
+ File(pid_file,
+ action="delete",
+ not_if=check_process
+ )
+ Execute(daemon_cmd,
+ not_if=service_is_up,
+ environment=hadoop_env_exports
+ )
+
+ if action == "stop":
+ File(pid_file,
+ action="delete",
+ )
+
+
+def get_value_from_jmx(qry, property):
+ try:
+ response = urllib2.urlopen(qry)
+ data = response.read()
+ if data:
+ data_dict = json.loads(data)
+ return data_dict["beans"][0][property]
+ except:
+ return None
+
+def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False):
+ """
+ :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
+ If not preceded, will use the encrypted param to determine.
+ :param modeler_type: Modeler type to query using startswith function
+ :param metric: Metric to return
+ :return: Return an object representation of the metric, or None if it does not exist
+ """
+ if not nn_address or not modeler_type or not metric:
+ return None
+
+ nn_address = nn_address.strip()
+ if not nn_address.startswith("http"):
+ nn_address = ("https://" if encrypted else "http://") + nn_address
+ if not nn_address.endswith("/"):
+ nn_address = nn_address + "/"
+
+ nn_address = nn_address + "jmx"
+ Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
+
+ if security_enabled:
+ import params
+ data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address,
+ "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user)
+ else:
+ data = urllib2.urlopen(nn_address).read()
+ my_data = None
+ if data:
+ data_dict = json.loads(data)
+ if data_dict:
+ for el in data_dict['beans']:
+ if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
+ if metric in el:
+ my_data = el[metric]
+ if my_data:
+ my_data = json.loads(str(my_data))
+ break
+ return my_data
+
+def get_port(address):
+ """
+ Extracts port from the address like 0.0.0.0:1019
+ """
+ if address is None:
+ return None
+ m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
+ if m is not None and len(m.groups()) >= 2:
+ return int(m.group(2))
+ else:
+ return None
+
+
+def is_secure_port(port):
+ """
+ Returns True if port is root-owned at *nix systems
+ """
+ if port is not None:
+ return port < 1024
+ else:
+ return False
+
+def get_dfsadmin_base_command(hdfs_binary, use_specific_namenode = False):
+ """
+ Get the dfsadmin base command constructed using hdfs_binary path and passing namenode address as explicit -fs argument
+ :param hdfs_binary: path to hdfs binary to use
+ :param use_specific_namenode: flag if set and Namenode HA is enabled, then the dfsadmin command will use
+ current namenode's address
+ :return: the constructed dfsadmin base command
+ """
+ import params
+ dfsadmin_base_command = ""
+ if params.dfs_ha_enabled and use_specific_namenode:
+ dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs hdfs://{params.namenode_rpc}")
+ else:
+ dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs {params.namenode_address}")
+ return dfsadmin_base_command
+
+def is_previous_fs_image():
+ """
+ Return true if there's a previous folder in the HDFS namenode directories.
+ """
+ import params
+ if params.dfs_name_dir:
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ for nn_dir in nn_name_dirs:
+ prev_dir = os.path.join(nn_dir, "previous")
+ if os.path.isdir(prev_dir):
+ return True
+ return False