You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/12/09 21:57:27 UTC
[35/51] [abbrv] ambari git commit: AMBARI-19137. HDP 3.0 TP - move ZK, HDFS,
YARN/MR into new common-services version (alejandro)
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/balancer.log
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/balancer.log b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/balancer.log
new file mode 100644
index 0000000..2010c02
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/balancer.log
@@ -0,0 +1,29 @@
+Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved
+Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB
+Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB
+Jul 28, 2014 5:04:07 PM 2 0 B 5.40 GB 9.79 GB
+Jul 28, 2014 5:05:14 PM 3 0 B 5.06 GB 9.79 GB
+Jul 28, 2014 5:05:50 PM 4 0 B 5.06 GB 9.79 GB
+Jul 28, 2014 5:06:56 PM 5 0 B 4.81 GB 9.79 GB
+Jul 28, 2014 5:07:33 PM 6 0 B 4.80 GB 9.79 GB
+Jul 28, 2014 5:09:11 PM 7 0 B 4.29 GB 9.79 GB
+Jul 28, 2014 5:09:47 PM 8 0 B 4.29 GB 9.79 GB
+Jul 28, 2014 5:11:24 PM 9 0 B 3.89 GB 9.79 GB
+Jul 28, 2014 5:12:00 PM 10 0 B 3.86 GB 9.79 GB
+Jul 28, 2014 5:13:37 PM 11 0 B 3.23 GB 9.79 GB
+Jul 28, 2014 5:15:13 PM 12 0 B 2.53 GB 9.79 GB
+Jul 28, 2014 5:15:49 PM 13 0 B 2.52 GB 9.79 GB
+Jul 28, 2014 5:16:25 PM 14 0 B 2.51 GB 9.79 GB
+Jul 28, 2014 5:17:01 PM 15 0 B 2.39 GB 9.79 GB
+Jul 28, 2014 5:17:37 PM 16 0 B 2.38 GB 9.79 GB
+Jul 28, 2014 5:18:14 PM 17 0 B 2.31 GB 9.79 GB
+Jul 28, 2014 5:18:50 PM 18 0 B 2.30 GB 9.79 GB
+Jul 28, 2014 5:19:26 PM 19 0 B 2.21 GB 9.79 GB
+Jul 28, 2014 5:20:02 PM 20 0 B 2.10 GB 9.79 GB
+Jul 28, 2014 5:20:38 PM 21 0 B 2.06 GB 9.79 GB
+Jul 28, 2014 5:22:14 PM 22 0 B 1.68 GB 9.79 GB
+Jul 28, 2014 5:23:20 PM 23 0 B 1.00 GB 9.79 GB
+Jul 28, 2014 5:23:56 PM 24 0 B 1016.16 MB 9.79 GB
+Jul 28, 2014 5:25:33 PM 25 0 B 30.55 MB 9.79 GB
+The cluster is balanced. Exiting...
+Balancing took 24.858033333333335 minutes
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/hdfs-command.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/hdfs-command.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/hdfs-command.py
new file mode 100644
index 0000000..88529b4
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/balancer-emulator/hdfs-command.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import time
+import sys
+from threading import Thread
+
+
+def write_function(path, handle, interval):
+ with open(path) as f:
+ for line in f:
+ handle.write(line)
+ handle.flush()
+ time.sleep(interval)
+
+thread = Thread(target = write_function, args = ('balancer.out', sys.stdout, 1.5))
+thread.start()
+
+threaderr = Thread(target = write_function, args = ('balancer.err', sys.stderr, 1.5 * 0.023))
+threaderr.start()
+
+thread.join()
+
+
+def rebalancer_out():
+ write_function('balancer.out', sys.stdout)
+
+def rebalancer_err():
+ write_function('balancer.err', sys.stdout)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode.py
new file mode 100644
index 0000000..130c021
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode.py
@@ -0,0 +1,178 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import datanode_upgrade
+from hdfs_datanode import datanode
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import conf_select, stack_select
+from resource_management.libraries.functions.constants import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML
+from resource_management.core.logger import Logger
+from hdfs import hdfs
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
+from utils import get_hdfs_binary
+
+class DataNode(Script):
+
+ def get_component_name(self):
+ return "hadoop-hdfs-datanode"
+
+ def get_hdfs_binary(self):
+ """
+ Get the name or path to the hdfs binary depending on the component name.
+ """
+ component_name = self.get_component_name()
+ return get_hdfs_binary(component_name)
+
+
+ def install(self, env):
+ import params
+ env.set_params(params)
+ self.install_packages(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ hdfs("datanode")
+ datanode(action="configure")
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ self.configure(env)
+ datanode(action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ # pre-upgrade steps shutdown the datanode, so there's no need to call
+
+ hdfs_binary = self.get_hdfs_binary()
+ if upgrade_type == "rolling":
+ stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary)
+ if not stopped:
+ datanode(action="stop")
+ else:
+ datanode(action="stop")
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ datanode(action = "status")
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class DataNodeDefault(DataNode):
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing DataNode Stack Upgrade pre-restart")
+ import params
+ env.set_params(params)
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-hdfs-datanode", params.version)
+
+ def post_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing DataNode Stack Upgrade post-restart")
+ import params
+ env.set_params(params)
+ hdfs_binary = self.get_hdfs_binary()
+ # ensure the DataNode has started and rejoined the cluster
+ datanode_upgrade.post_upgrade_check(hdfs_binary)
+
+ def security_status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+ props_value_check = None
+ props_empty_check = ['dfs.datanode.keytab.file',
+ 'dfs.datanode.kerberos.principal']
+ props_read_check = ['dfs.datanode.keytab.file']
+ hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ hdfs_expectations = {}
+ hdfs_expectations.update(core_site_expectations)
+ hdfs_expectations.update(hdfs_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML,
+ 'hdfs-site.xml': FILE_TYPE_XML})
+
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ try:
+ # Double check the dict before calling execute
+ if ('hdfs-site' not in security_params or
+ 'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or
+ 'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']):
+ self.put_structured_out({"securityState": "UNSECURED"})
+ self.put_structured_out(
+ {"securityIssuesFound": "Keytab file or principal are not set property."})
+ return
+
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ security_params['hdfs-site']['dfs.datanode.keytab.file'],
+ security_params['hdfs-site']['dfs.datanode.kerberos.principal'],
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+ def get_log_folder(self):
+ import params
+ return params.hdfs_log_dir
+
+ def get_user(self):
+ import params
+ return params.hdfs_user
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.datanode_pid_file]
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class DataNodeWindows(DataNode):
+ def install(self, env):
+ import install_params
+ self.install_packages(env)
+
+if __name__ == "__main__":
+ DataNode().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode_upgrade.py
new file mode 100644
index 0000000..b55237d
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/datanode_upgrade.py
@@ -0,0 +1,156 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Execute
+from resource_management.core import shell
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.decorator import retry
+from resource_management.libraries.functions import check_process_status
+from resource_management.core import ComponentIsNotRunning
+from utils import get_dfsadmin_base_command
+
+
+def pre_rolling_upgrade_shutdown(hdfs_binary):
+ """
+ Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the
+ DataNode in preparation for an upgrade. This will then periodically check
+ "getDatanodeInfo" to ensure the DataNode has shutdown correctly.
+ This function will obtain the Kerberos ticket if security is enabled.
+ :param hdfs_binary: name/path of the HDFS binary to use
+ :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully.
+ """
+ import params
+
+ Logger.info('DataNode executing "shutdownDatanode" command in preparation for upgrade...')
+ if params.security_enabled:
+ Execute(params.dn_kinit_cmd, user = params.hdfs_user)
+
+ dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+ command = format('{dfsadmin_base_command} -shutdownDatanode {dfs_dn_ipc_address} upgrade')
+
+ code, output = shell.call(command, user=params.hdfs_user)
+ if code == 0:
+ # verify that the datanode is down
+ _check_datanode_shutdown(hdfs_binary)
+ else:
+ # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it.
+ if output is not None and re.search("Shutdown already in progress", output):
+ Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command))
+ return False
+ return True
+
+
+def post_upgrade_check(hdfs_binary):
+ """
+ Verifies that the DataNode has rejoined the cluster. This function will
+ obtain the Kerberos ticket if security is enabled.
+ :param hdfs_binary: name/path of the HDFS binary to use
+ :return:
+ """
+ import params
+
+ Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...")
+ if params.security_enabled:
+ Execute(params.dn_kinit_cmd, user=params.hdfs_user)
+
+ # verify that the datanode has started and rejoined the HDFS cluster
+ _check_datanode_startup(hdfs_binary)
+
+
+def is_datanode_process_running():
+ import params
+ try:
+ check_process_status(params.datanode_pid_file)
+ return True
+ except ComponentIsNotRunning:
+ return False
+
+@retry(times=24, sleep_time=5, err_class=Fail)
+def _check_datanode_shutdown(hdfs_binary):
+ """
+ Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
+ several times, pausing in between runs. Once the DataNode stops responding
+ this method will return, otherwise it will raise a Fail(...) and retry
+ automatically.
+ The stack defaults for retrying for HDFS are also way too slow for this
+ command; they are set to wait about 45 seconds between client retries. As
+ a result, a single execution of dfsadmin will take 45 seconds to retry and
+ the DataNode may be marked as dead, causing problems with HBase.
+ https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
+ times for ipc.client.connect.retry.interval. In the meantime, override them
+ here, but only for RU.
+ :param hdfs_binary: name/path of the HDFS binary to use
+ :return:
+ """
+ import params
+
+ # override stock retry timeouts since after 30 seconds, the datanode is
+ # marked as dead and can affect HBase during RU
+ dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+ command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
+
+ try:
+ Execute(command, user=params.hdfs_user, tries=1)
+ except:
+ Logger.info("DataNode has successfully shutdown for upgrade.")
+ return
+
+ Logger.info("DataNode has not shutdown.")
+ raise Fail('DataNode has not shutdown.')
+
+
+@retry(times=30, sleep_time=30, err_class=Fail) # keep trying for 15 mins
+def _check_datanode_startup(hdfs_binary):
+ """
+ Checks that a DataNode process is running and DataNode is reported as being alive via the
+ "hdfs dfsadmin -fs {namenode_address} -report -live" command. Once the DataNode is found to be
+ alive this method will return, otherwise it will raise a Fail(...) and retry
+ automatically.
+ :param hdfs_binary: name/path of the HDFS binary to use
+ :return:
+ """
+
+ if not is_datanode_process_running():
+ Logger.info("DataNode process is not running")
+ raise Fail("DataNode process is not running")
+
+ import params
+ import socket
+
+ try:
+ dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+ command = dfsadmin_base_command + ' -report -live'
+ return_code, hdfs_output = shell.call(command, user=params.hdfs_user)
+ except:
+ raise Fail('Unable to determine if the DataNode has started after upgrade.')
+
+ if return_code == 0:
+ hostname = params.hostname.lower()
+ hostname_ip = socket.gethostbyname(params.hostname.lower())
+ if hostname in hdfs_output.lower() or hostname_ip in hdfs_output.lower():
+ Logger.info("DataNode {0} reports that it has rejoined the cluster.".format(params.hostname))
+ return
+ else:
+ raise Fail("DataNode {0} was not found in the list of live DataNodes".format(params.hostname))
+
+ # return_code is not 0, fail
+ raise Fail("Unable to determine if the DataNode has started after upgrade (result code {0})".format(str(return_code)))
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs.py
new file mode 100644
index 0000000..d9b62e2
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs.py
@@ -0,0 +1,178 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.libraries.script.script import Script
+from resource_management.core.resources.system import Directory, File, Link
+from resource_management.core.resources import Package
+from resource_management.core.source import Template
+from resource_management.core.resources.service import ServiceConfig
+from resource_management.libraries.resources.xml_config import XmlConfig
+import os
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons import OSConst
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def hdfs(name=None):
+ import params
+
+ if params.create_lib_snappy_symlinks:
+ install_snappy()
+
+ # On some OS this folder could be not exists, so we will create it before pushing there files
+ Directory(params.limits_conf_dir,
+ create_parents = True,
+ owner='root',
+ group='root'
+ )
+
+ File(os.path.join(params.limits_conf_dir, 'hdfs.conf'),
+ owner='root',
+ group='root',
+ mode=0644,
+ content=Template("hdfs.conf.j2")
+ )
+
+ if params.security_enabled:
+ tc_mode = 0644
+ tc_owner = "root"
+ else:
+ tc_mode = None
+ tc_owner = params.hdfs_user
+
+ if "hadoop-policy" in params.config['configurations']:
+ XmlConfig("hadoop-policy.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['hadoop-policy'],
+ configuration_attributes=params.config['configuration_attributes']['hadoop-policy'],
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ if "ssl-client" in params.config['configurations']:
+ XmlConfig("ssl-client.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['ssl-client'],
+ configuration_attributes=params.config['configuration_attributes']['ssl-client'],
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ Directory(params.hadoop_conf_secure_dir,
+ create_parents = True,
+ owner='root',
+ group=params.user_group,
+ cd_access='a',
+ )
+
+ XmlConfig("ssl-client.xml",
+ conf_dir=params.hadoop_conf_secure_dir,
+ configurations=params.config['configurations']['ssl-client'],
+ configuration_attributes=params.config['configuration_attributes']['ssl-client'],
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ if "ssl-server" in params.config['configurations']:
+ XmlConfig("ssl-server.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['ssl-server'],
+ configuration_attributes=params.config['configuration_attributes']['ssl-server'],
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ XmlConfig("hdfs-site.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['hdfs-site'],
+ configuration_attributes=params.config['configuration_attributes']['hdfs-site'],
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ XmlConfig("core-site.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['core-site'],
+ configuration_attributes=params.config['configuration_attributes']['core-site'],
+ owner=params.hdfs_user,
+ group=params.user_group,
+ mode=0644
+ )
+
+ File(os.path.join(params.hadoop_conf_dir, 'slaves'),
+ owner=tc_owner,
+ content=Template("slaves.j2")
+ )
+
+ if params.lzo_enabled and len(params.lzo_packages) > 0:
+ Package(params.lzo_packages,
+ retry_on_repo_unavailability=params.agent_stack_retry_on_unavailability,
+ retry_count=params.agent_stack_retry_count)
+
+def install_snappy():
+ import params
+ Directory([params.so_target_dir_x86, params.so_target_dir_x64],
+ create_parents = True,
+ )
+ Link(params.so_target_x86,
+ to=params.so_src_x86,
+ )
+ Link(params.so_target_x64,
+ to=params.so_src_x64,
+ )
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def hdfs(component=None):
+ import params
+ if component == "namenode":
+ directories = params.dfs_name_dir.split(",")
+ Directory(directories,
+ owner=params.hdfs_user,
+ mode="(OI)(CI)F",
+ create_parents = True
+ )
+ File(params.exclude_file_path,
+ content=Template("exclude_hosts_list.j2"),
+ owner=params.hdfs_user,
+ mode="f",
+ )
+ if params.service_map.has_key(component):
+ service_name = params.service_map[component]
+ ServiceConfig(service_name,
+ action="change_user",
+ username=params.hdfs_user,
+ password=Script.get_password(params.hdfs_user))
+
+ if "hadoop-policy" in params.config['configurations']:
+ XmlConfig("hadoop-policy.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['hadoop-policy'],
+ owner=params.hdfs_user,
+ mode="f",
+ configuration_attributes=params.config['configuration_attributes']['hadoop-policy']
+ )
+
+ XmlConfig("hdfs-site.xml",
+ conf_dir=params.hadoop_conf_dir,
+ configurations=params.config['configurations']['hdfs-site'],
+ owner=params.hdfs_user,
+ mode="f",
+ configuration_attributes=params.config['configuration_attributes']['hdfs-site']
+ )
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_client.py
new file mode 100644
index 0000000..4dabdbc
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_client.py
@@ -0,0 +1,122 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import conf_select, stack_select
+from resource_management.libraries.functions.constants import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_XML
+from hdfs import hdfs
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
+from resource_management.core.exceptions import ClientComponentHasNoStatus
+
+class HdfsClient(Script):
+
+ def install(self, env):
+ import params
+ env.set_params(params)
+ self.install_packages(env)
+ self.configure(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ hdfs()
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ def status(self, env):
+ raise ClientComponentHasNoStatus()
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class HdfsClientDefault(HdfsClient):
+
+ def get_component_name(self):
+ return "hadoop-client"
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-client", params.version)
+
+ def security_status(self, env):
+ import status_params
+ env.set_params(status_params)
+
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+ hdfs_expectations ={}
+ hdfs_expectations.update(core_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML})
+
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ if status_params.hdfs_user_principal or status_params.hdfs_user_keytab:
+ try:
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ status_params.hdfs_user_keytab,
+ status_params.hdfs_user_principal,
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class HdfsClientWindows(HdfsClient):
+ def install(self, env):
+ import install_params
+ self.install_packages(env)
+ self.configure(env)
+
+if __name__ == "__main__":
+ HdfsClient().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_datanode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_datanode.py
new file mode 100644
index 0000000..2d3d4f5
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_datanode.py
@@ -0,0 +1,85 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+from resource_management.core.resources.system import Directory, Execute, File
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.mounted_dirs_helper import handle_mounted_dirs
+from utils import service
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from ambari_commons import OSConst
+
+
+def create_dirs(data_dir):
+ """
+ :param data_dir: The directory to create
+ :param params: parameters
+ """
+ import params
+ Directory(data_dir,
+ create_parents = True,
+ cd_access="a",
+ mode=0755,
+ owner=params.hdfs_user,
+ group=params.user_group,
+ ignore_failures=True
+ )
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def datanode(action=None):
+ if action == "configure":
+ import params
+ Directory(params.dfs_domain_socket_dir,
+ create_parents = True,
+ mode=0751,
+ owner=params.hdfs_user,
+ group=params.user_group)
+
+ # handle_mounted_dirs ensures that we don't create dfs data dirs which are temporary unavailable (unmounted), and intended to reside on a different mount.
+ data_dir_to_mount_file_content = handle_mounted_dirs(create_dirs, params.dfs_data_dirs, params.data_dir_mount_file, params)
+ # create a history file used by handle_mounted_dirs
+ File(params.data_dir_mount_file,
+ owner=params.hdfs_user,
+ group=params.user_group,
+ mode=0644,
+ content=data_dir_to_mount_file_content
+ )
+
+ elif action == "start" or action == "stop":
+ import params
+ service(
+ action=action, name="datanode",
+ user=params.hdfs_user,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
+ elif action == "status":
+ import status_params
+ check_process_status(status_params.datanode_pid_file)
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def datanode(action=None):
+ if action == "configure":
+ pass
+ elif(action == "start" or action == "stop"):
+ import params
+ Service(params.datanode_win_service_name, action=action)
+ elif action == "status":
+ import status_params
+ check_windows_service_status(status_params.datanode_win_service_name)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_namenode.py
new file mode 100644
index 0000000..23119f0
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_namenode.py
@@ -0,0 +1,562 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os.path
+import time
+
+from resource_management.core import shell
+from resource_management.core.source import Template
+from resource_management.core.resources.system import File, Execute, Directory
+from resource_management.core.resources.service import Service
+from resource_management.libraries.functions import namenode_ha_utils
+from resource_management.libraries.functions.decorator import retry
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop
+from resource_management.libraries.functions import Direction
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from utils import get_dfsadmin_base_command
+
+if OSCheck.is_windows_family():
+ from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+
+from utils import service, safe_zkfc_op, is_previous_fs_image
+from setup_ranger_hdfs import setup_ranger_hdfs, create_ranger_audit_hdfs_directories
+
+import namenode_upgrade
+
+def wait_for_safemode_off(hdfs_binary, afterwait_sleep=0, execute_kinit=False):
+ """
+ During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
+ all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
+ If HA is present, then this command will run individually on each NameNode, which checks for its own address.
+ """
+ import params
+
+ retries = 115
+ sleep_seconds = 10
+ sleep_minutes = int(sleep_seconds * retries / 60)
+
+ Logger.info("Waiting up to {0} minutes for the NameNode to leave Safemode...".format(sleep_minutes))
+
+ if params.security_enabled and execute_kinit:
+ kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
+ Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+ try:
+ # Note, this fails if namenode_address isn't prefixed with "params."
+
+ dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary, use_specific_namenode=True)
+ is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'"
+
+ # Wait up to 30 mins
+ Execute(is_namenode_safe_mode_off, tries=retries, try_sleep=sleep_seconds,
+ user=params.hdfs_user, logoutput=True)
+
+ # Wait a bit more since YARN still depends on block reports coming in.
+ # Also saw intermittent errors with HBASE service check if it was done too soon.
+ time.sleep(afterwait_sleep)
+ except Fail:
+ Logger.error("The NameNode is still in Safemode. Please be careful with commands that need Safemode OFF.")
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None,
+ upgrade_suspended=False, env=None):
+
+ if action is None:
+ raise Fail('"action" parameter is required for function namenode().')
+
+ if action in ["start", "stop"] and hdfs_binary is None:
+ raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
+ if action == "configure":
+ import params
+ #we need this directory to be present before any action(HA manual steps for
+ #additional namenode)
+ create_name_dirs(params.dfs_name_dir)
+ elif action == "start":
+ Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type)))
+ setup_ranger_hdfs(upgrade_type=upgrade_type)
+ import params
+ if do_format and not params.hdfs_namenode_format_disabled:
+ format_namenode()
+ pass
+
+ File(params.exclude_file_path,
+ content=Template("exclude_hosts_list.j2"),
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+
+ if params.dfs_ha_enabled and \
+ params.dfs_ha_namenode_standby is not None and \
+ params.hostname == params.dfs_ha_namenode_standby:
+ # if the current host is the standby NameNode in an HA deployment
+ # run the bootstrap command, to start the NameNode in standby mode
+ # this requires that the active NameNode is already up and running,
+ # so this execute should be re-tried upon failure, up to a timeout
+ success = bootstrap_standby_namenode(params)
+ if not success:
+ raise Fail("Could not bootstrap standby namenode")
+
+ if upgrade_type == "rolling" and params.dfs_ha_enabled:
+ # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried
+ # to kill ZKFC manually, so we need to start it if not already running.
+ safe_zkfc_op(action, env)
+
+ options = ""
+ if upgrade_type == "rolling":
+ if params.upgrade_direction == Direction.UPGRADE:
+ options = "-rollingUpgrade started"
+ elif params.upgrade_direction == Direction.DOWNGRADE:
+ options = "-rollingUpgrade downgrade"
+ elif upgrade_type == "nonrolling":
+ is_previous_image_dir = is_previous_fs_image()
+ Logger.info("Previous file system image dir present is {0}".format(str(is_previous_image_dir)))
+
+ if params.upgrade_direction == Direction.UPGRADE:
+ options = "-rollingUpgrade started"
+ elif params.upgrade_direction == Direction.DOWNGRADE:
+ options = "-rollingUpgrade downgrade"
+ elif upgrade_type is None and upgrade_suspended is True:
+ # the rollingUpgrade flag must be passed in during a suspended upgrade when starting NN
+ if os.path.exists(namenode_upgrade.get_upgrade_in_progress_marker()):
+ options = "-rollingUpgrade started"
+ else:
+ Logger.info("The NameNode upgrade marker file {0} does not exist, yet an upgrade is currently suspended. "
+ "Assuming that the upgrade of NameNode has not occurred yet.".format(namenode_upgrade.get_upgrade_in_progress_marker()))
+
+ Logger.info("Options for start command are: {0}".format(options))
+
+ service(
+ action="start",
+ name="namenode",
+ user=params.hdfs_user,
+ options=options,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
+
+ if params.security_enabled:
+ Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+ user = params.hdfs_user)
+
+ # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____|
+ # no-HA | ON -> OFF | Yes |
+ # HA and active | ON -> OFF | Yes |
+ # HA and standby | no change | No |
+ # RU with HA on active | ON -> OFF | Yes |
+ # RU with HA on standby | ON -> OFF | Yes |
+ # EU with HA on active | ON -> OFF | No |
+ # EU with HA on standby | ON -> OFF | No |
+ # EU non-HA | ON -> OFF | No |
+
+ # because we do things like create directories after starting NN,
+ # the vast majority of the time this should be True - it should only
+ # be False if this is HA and we are the Standby NN
+ ensure_safemode_off = True
+
+ # True if this is the only NameNode (non-HA) or if its the Active one in HA
+ is_active_namenode = True
+
+ if params.dfs_ha_enabled:
+ Logger.info("Waiting for the NameNode to broadcast whether it is Active or Standby...")
+
+ if is_this_namenode_active() is False:
+ # we are the STANDBY NN
+ is_active_namenode = False
+
+ # we are the STANDBY NN and this restart is not part of an upgrade
+ if upgrade_type is None:
+ ensure_safemode_off = False
+
+
+ # During an Express Upgrade, NameNode will not leave SafeMode until the DataNodes are started,
+ # so always disable the Safemode check
+ if upgrade_type == "nonrolling":
+ ensure_safemode_off = False
+
+ # some informative logging separate from the above logic to keep things a little cleaner
+ if ensure_safemode_off:
+ Logger.info("Waiting for this NameNode to leave Safemode due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format(
+ params.dfs_ha_enabled, is_active_namenode, upgrade_type))
+ else:
+ Logger.info("Skipping Safemode check due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format(
+ params.dfs_ha_enabled, is_active_namenode, upgrade_type))
+
+
+ # wait for Safemode to end
+ if ensure_safemode_off:
+ wait_for_safemode_off(hdfs_binary)
+
+ # Always run this on the "Active" NN unless Safemode has been ignored
+ # in the case where safemode was ignored (like during an express upgrade), then
+ # NN will be in SafeMode and cannot have directories created
+ if is_active_namenode and ensure_safemode_off:
+ create_hdfs_directories()
+ create_ranger_audit_hdfs_directories()
+ else:
+ Logger.info("Skipping creation of HDFS directories since this is either not the Active NameNode or we did not wait for Safemode to finish.")
+
+ elif action == "stop":
+ import params
+ service(
+ action="stop", name="namenode",
+ user=params.hdfs_user
+ )
+ elif action == "status":
+ import status_params
+ check_process_status(status_params.namenode_pid_file)
+ elif action == "decommission":
+ decommission()
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None,
+ upgrade_suspended=False, env=None):
+
+ if action is None:
+ raise Fail('"action" parameter is required for function namenode().')
+
+ if action in ["start", "stop"] and hdfs_binary is None:
+ raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
+ if action == "configure":
+ pass
+ elif action == "start":
+ import params
+ #TODO: Replace with format_namenode()
+ namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED")
+ if not os.path.exists(namenode_format_marker):
+ hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
+ Execute("%s namenode -format" % (hadoop_cmd))
+ open(namenode_format_marker, 'a').close()
+ Service(params.namenode_win_service_name, action=action)
+ elif action == "stop":
+ import params
+ Service(params.namenode_win_service_name, action=action)
+ elif action == "status":
+ import status_params
+ check_windows_service_status(status_params.namenode_win_service_name)
+ elif action == "decommission":
+ decommission()
+
+def create_name_dirs(directories):
+ import params
+
+ dirs = directories.split(",")
+ Directory(dirs,
+ mode=0755,
+ owner=params.hdfs_user,
+ group=params.user_group,
+ create_parents = True,
+ cd_access="a",
+ )
+
+
+def create_hdfs_directories():
+ import params
+
+ params.HdfsResource(params.hdfs_tmp_dir,
+ type="directory",
+ action="create_on_execute",
+ owner=params.hdfs_user,
+ mode=0777,
+ )
+ params.HdfsResource(params.smoke_hdfs_user_dir,
+ type="directory",
+ action="create_on_execute",
+ owner=params.smoke_user,
+ mode=params.smoke_hdfs_user_mode,
+ )
+ params.HdfsResource(None,
+ action="execute",
+ )
+
+def format_namenode(force=None):
+ import params
+
+ old_mark_dir = params.namenode_formatted_old_mark_dirs
+ mark_dir = params.namenode_formatted_mark_dirs
+ dfs_name_dir = params.dfs_name_dir
+ hdfs_user = params.hdfs_user
+ hadoop_conf_dir = params.hadoop_conf_dir
+
+ if not params.dfs_ha_enabled:
+ if force:
+ ExecuteHadoop('namenode -format',
+ bin_dir=params.hadoop_bin_dir,
+ conf_dir=hadoop_conf_dir)
+ else:
+ if not is_namenode_formatted(params):
+ Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"),
+ user = params.hdfs_user,
+ path = [params.hadoop_bin_dir]
+ )
+ for m_dir in mark_dir:
+ Directory(m_dir,
+ create_parents = True
+ )
+ else:
+ if params.dfs_ha_namenode_active is not None and \
+ params.hostname == params.dfs_ha_namenode_active:
+ # check and run the format command in the HA deployment scenario
+ # only format the "active" namenode in an HA deployment
+ if force:
+ ExecuteHadoop('namenode -format',
+ bin_dir=params.hadoop_bin_dir,
+ conf_dir=hadoop_conf_dir)
+ else:
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ if not is_namenode_formatted(params):
+ try:
+ Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"),
+ user = params.hdfs_user,
+ path = [params.hadoop_bin_dir]
+ )
+ except Fail:
+ # We need to clean-up mark directories, so we can re-run format next time.
+ for nn_name_dir in nn_name_dirs:
+ Execute(format("rm -rf {nn_name_dir}/*"),
+ user = params.hdfs_user,
+ )
+ raise
+ for m_dir in mark_dir:
+ Directory(m_dir,
+ create_parents = True
+ )
+
+def is_namenode_formatted(params):
+ old_mark_dirs = params.namenode_formatted_old_mark_dirs
+ mark_dirs = params.namenode_formatted_mark_dirs
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ marked = False
+ # Check if name directories have been marked as formatted
+ for mark_dir in mark_dirs:
+ if os.path.isdir(mark_dir):
+ marked = True
+ Logger.info(format("{mark_dir} exists. Namenode DFS already formatted"))
+
+ # Ensure that all mark dirs created for all name directories
+ if marked:
+ for mark_dir in mark_dirs:
+ Directory(mark_dir,
+ create_parents = True
+ )
+ return marked
+
+ # Move all old format markers to new place
+ for old_mark_dir in old_mark_dirs:
+ if os.path.isdir(old_mark_dir):
+ for mark_dir in mark_dirs:
+ Execute(('cp', '-ar', old_mark_dir, mark_dir),
+ sudo = True
+ )
+ marked = True
+ Directory(old_mark_dir,
+ action = "delete"
+ )
+ elif os.path.isfile(old_mark_dir):
+ for mark_dir in mark_dirs:
+ Directory(mark_dir,
+ create_parents = True,
+ )
+ Directory(old_mark_dir,
+ action = "delete"
+ )
+ marked = True
+
+ if marked:
+ return True
+
+ # Check if name dirs are not empty
+ for name_dir in nn_name_dirs:
+ code, out = shell.call(("ls", name_dir))
+ dir_exists_and_valid = bool(not code)
+
+ if not dir_exists_and_valid: # situations if disk exists but is crashed at the moment (ls: reading directory ...: Input/output error)
+ Logger.info(format("NameNode will not be formatted because the directory {name_dir} is missing or cannot be checked for content. {out}"))
+ return True
+
+ try:
+ Execute(format("ls {name_dir} | wc -l | grep -q ^0$"),
+ )
+ except Fail:
+ Logger.info(format("NameNode will not be formatted since {name_dir} exists and contains content"))
+ return True
+
+ return False
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def decommission():
+ import params
+
+ hdfs_user = params.hdfs_user
+ conf_dir = params.hadoop_conf_dir
+ user_group = params.user_group
+ nn_kinit_cmd = params.nn_kinit_cmd
+
+ File(params.exclude_file_path,
+ content=Template("exclude_hosts_list.j2"),
+ owner=hdfs_user,
+ group=user_group
+ )
+
+ if not params.update_exclude_file_only:
+ Execute(nn_kinit_cmd,
+ user=hdfs_user
+ )
+
+ if params.dfs_ha_enabled:
+ # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
+ # need to execute each command scoped to a particular namenode
+ nn_refresh_cmd = format('dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
+ else:
+ nn_refresh_cmd = format('dfsadmin -fs {namenode_address} -refreshNodes')
+ ExecuteHadoop(nn_refresh_cmd,
+ user=hdfs_user,
+ conf_dir=conf_dir,
+ bin_dir=params.hadoop_bin_dir)
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def decommission():
+ import params
+ hdfs_user = params.hdfs_user
+ conf_dir = params.hadoop_conf_dir
+
+ File(params.exclude_file_path,
+ content=Template("exclude_hosts_list.j2"),
+ owner=hdfs_user
+ )
+
+ if params.dfs_ha_enabled:
+ # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
+ # need to execute each command scoped to a particular namenode
+ nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
+ else:
+ nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs {namenode_address} -refreshNodes')
+ Execute(nn_refresh_cmd, user=hdfs_user)
+
+
+def bootstrap_standby_namenode(params, use_path=False):
+ mark_dirs = params.namenode_bootstrapped_mark_dirs
+ bin_path = os.path.join(params.hadoop_bin_dir, '') if use_path else ""
+ try:
+ iterations = 50
+ bootstrapped = False
+ bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive")
+ # Blue print based deployments start both NN in parallel and occasionally
+ # the first attempt to bootstrap may fail. Depending on how it fails the
+ # second attempt may not succeed (e.g. it may find the folder and decide that
+ # bootstrap succeeded). The solution is to call with -force option but only
+ # during initial start
+ if params.command_phase == "INITIAL_START":
+ # force bootstrap in INITIAL_START phase
+ bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive -force")
+ elif is_namenode_bootstrapped(params):
+ # Once out of INITIAL_START phase bootstrap only if we couldnt bootstrap during cluster deployment
+ return True
+ Logger.info("Boostrapping standby namenode: %s" % (bootstrap_cmd))
+ for i in range(iterations):
+ Logger.info('Try %d out of %d' % (i+1, iterations))
+ code, out = shell.call(bootstrap_cmd, logoutput=False, user=params.hdfs_user)
+ if code == 0:
+ Logger.info("Standby namenode bootstrapped successfully")
+ bootstrapped = True
+ break
+ elif code == 5:
+ Logger.info("Standby namenode already bootstrapped")
+ bootstrapped = True
+ break
+ else:
+ Logger.warning('Bootstrap standby namenode failed with %d error code. Will retry' % (code))
+ except Exception as ex:
+ Logger.error('Bootstrap standby namenode threw an exception. Reason %s' %(str(ex)))
+ if bootstrapped:
+ for mark_dir in mark_dirs:
+ Directory(mark_dir,
+ create_parents = True
+ )
+ return bootstrapped
+
+def is_namenode_bootstrapped(params):
+ mark_dirs = params.namenode_bootstrapped_mark_dirs
+ nn_name_dirs = params.dfs_name_dir.split(',')
+ marked = False
+ # Check if name directories have been marked as formatted
+ for mark_dir in mark_dirs:
+ if os.path.isdir(mark_dir):
+ marked = True
+ Logger.info(format("{mark_dir} exists. Standby Namenode already bootstrapped"))
+ break
+
+ # Ensure that all mark dirs created for all name directories
+ if marked:
+ for mark_dir in mark_dirs:
+ Directory(mark_dir,
+ create_parents = True
+ )
+
+ return marked
+
+
+@retry(times=125, sleep_time=5, backoff_factor=2, err_class=Fail)
+def is_this_namenode_active():
+ """
+ Gets whether the current NameNode is Active. This function will wait until the NameNode is
+ listed as being either Active or Standby before returning a value. This is to ensure that
+ that if the other NameNode is Active, we ensure that this NameNode has fully loaded and
+ registered in the event that the other NameNode is going to be restarted. This prevents
+ a situation where we detect the other NameNode as Active before this NameNode has fully booted.
+ If the other Active NameNode is then restarted, there can be a loss of service if this
+ NameNode has not entered Standby.
+ """
+ import params
+
+ # returns ([('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], [])
+ # 0 1 2
+ # or
+ # returns ([], [('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], [])
+ # 0 1 2
+ #
+ namenode_states = namenode_ha_utils.get_namenode_states(params.hdfs_site, params.security_enabled,
+ params.hdfs_user, times=5, sleep_time=5, backoff_factor=2)
+
+ # unwraps [('nn1', 'c6401.ambari.apache.org:50070')]
+ active_namenodes = [] if len(namenode_states[0]) < 1 else namenode_states[0]
+
+ # unwraps [('nn2', 'c6402.ambari.apache.org:50070')]
+ standby_namenodes = [] if len(namenode_states[1]) < 1 else namenode_states[1]
+
+ # check to see if this is the active NameNode
+ for entry in active_namenodes:
+ if params.namenode_id in entry:
+ return True
+
+ # if this is not the active NameNode, then we must wait for it to register as standby
+ for entry in standby_namenodes:
+ if params.namenode_id in entry:
+ return False
+
+ # this this point, this NameNode is neither active nor standby - we must wait to ensure it
+ # enters at least one of these roles before returning a verdict - the annotation will catch
+ # this failure and retry the fuction automatically
+ raise Fail(format("The NameNode {namenode_id} is not listed as Active or Standby, waiting..."))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_nfsgateway.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_nfsgateway.py
new file mode 100644
index 0000000..672312a
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_nfsgateway.py
@@ -0,0 +1,75 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core.resources import Directory
+from resource_management.core import shell
+from utils import service
+import subprocess,os
+
+# NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
+# on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
+
+def prepare_rpcbind():
+ Logger.info("check if native nfs server is running")
+ p, output = shell.call("pgrep nfsd")
+ if p == 0 :
+ Logger.info("native nfs server is running. shutting it down...")
+ # shutdown nfs
+ shell.call("service nfs stop")
+ shell.call("service nfs-kernel-server stop")
+ Logger.info("check if the native nfs server is down...")
+ p, output = shell.call("pgrep nfsd")
+ if p == 0 :
+ raise Fail("Failed to shutdown native nfs service")
+
+ Logger.info("check if rpcbind or portmap is running")
+ p, output = shell.call("pgrep rpcbind")
+ q, output = shell.call("pgrep portmap")
+
+ if p!=0 and q!=0 :
+ Logger.info("no portmap or rpcbind running. starting one...")
+ p, output = shell.call(("service", "rpcbind", "start"), sudo=True)
+ q, output = shell.call(("service", "portmap", "start"), sudo=True)
+ if p!=0 and q!=0 :
+ raise Fail("Failed to start rpcbind or portmap")
+
+ Logger.info("now we are ready to start nfs gateway")
+
+
+def nfsgateway(action=None, format=False):
+ import params
+
+ if action== "start":
+ prepare_rpcbind()
+
+ if action == "configure":
+ Directory(params.nfs_file_dump_dir,
+ owner = params.hdfs_user,
+ group = params.user_group,
+ )
+ elif action == "start" or action == "stop":
+ service(
+ action=action,
+ name="nfs3",
+ user=params.root_user,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_rebalance.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_rebalance.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_rebalance.py
new file mode 100644
index 0000000..1dc545e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_rebalance.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import re
+
+class HdfsParser():
+ def __init__(self):
+ self.initialLine = None
+ self.state = None
+
+ def parseLine(self, line):
+ hdfsLine = HdfsLine()
+ type, matcher = hdfsLine.recognizeType(line)
+ if(type == HdfsLine.LineType.HeaderStart):
+ self.state = 'PROCESS_STARTED'
+ elif (type == HdfsLine.LineType.Progress):
+ self.state = 'PROGRESS'
+ hdfsLine.parseProgressLog(line, matcher)
+ if(self.initialLine == None): self.initialLine = hdfsLine
+
+ return hdfsLine
+ elif (type == HdfsLine.LineType.ProgressEnd):
+ self.state = 'PROCESS_FINISED'
+ return None
+
+class HdfsLine():
+
+ class LineType:
+ HeaderStart, Progress, ProgressEnd, Unknown = range(4)
+
+
+ MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB']
+ MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))'
+
+ HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved')
+ PROGRESS_PATTERN = re.compile(
+ "(?P<date>.*?)\s+" +
+ "(?P<iteration>\d+)\s+" +
+ MEMORY_PATTERN % (1,1,1) + "\s+" +
+ MEMORY_PATTERN % (2,2,2) + "\s+" +
+ MEMORY_PATTERN % (3,3,3)
+ )
+ PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)')
+
+ def __init__(self):
+ self.date = None
+ self.iteration = None
+ self.bytesAlreadyMoved = None
+ self.bytesLeftToMove = None
+ self.bytesBeingMoved = None
+ self.bytesAlreadyMovedStr = None
+ self.bytesLeftToMoveStr = None
+ self.bytesBeingMovedStr = None
+
+ def recognizeType(self, line):
+ for (type, pattern) in (
+ (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN),
+ (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN),
+ (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN)
+ ):
+ m = re.match(pattern, line)
+ if m:
+ return type, m
+ return HdfsLine.LineType.Unknown, None
+
+ def parseProgressLog(self, line, m):
+ '''
+ Parse the line of 'hdfs rebalancer' output. The example output being parsed:
+
+ Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved
+ Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB
+ Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB
+
+ Throws AmbariException in case of parsing errors
+
+ '''
+ m = re.match(self.PROGRESS_PATTERN, line)
+ if m:
+ self.date = m.group('date')
+ self.iteration = int(m.group('iteration'))
+
+ self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1'))
+ self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2'))
+ self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3'))
+
+ self.bytesAlreadyMovedStr = m.group('memmult_1')
+ self.bytesLeftToMoveStr = m.group('memmult_2')
+ self.bytesBeingMovedStr = m.group('memmult_3')
+ else:
+ raise AmbariException("Failed to parse line [%s]")
+
+ def parseMemory(self, memorySize, multiplier_type):
+ try:
+ factor = self.MEMORY_SUFFIX.index(multiplier_type)
+ except ValueError:
+ raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type))
+
+ return float(memorySize) * (1024 ** factor)
+ def toJson(self):
+ return {
+ 'timeStamp' : self.date,
+ 'iteration' : self.iteration,
+
+ 'dataMoved': self.bytesAlreadyMovedStr,
+ 'dataLeft' : self.bytesLeftToMoveStr,
+ 'dataBeingMoved': self.bytesBeingMovedStr,
+
+ 'bytesMoved': self.bytesAlreadyMoved,
+ 'bytesLeft' : self.bytesLeftToMove,
+ 'bytesBeingMoved': self.bytesBeingMoved,
+ }
+ def __str__(self):
+ return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_snamenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_snamenode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_snamenode.py
new file mode 100644
index 0000000..8d4c40c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/hdfs_snamenode.py
@@ -0,0 +1,66 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from utils import service
+from resource_management.core.resources.system import Directory, File
+from resource_management.core.source import Template
+from resource_management.libraries.functions.check_process_status import check_process_status
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from ambari_commons import OSConst
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def snamenode(action=None, format=False):
+ if action == "configure":
+ import params
+ for fs_checkpoint_dir in params.fs_checkpoint_dirs:
+ Directory(fs_checkpoint_dir,
+ create_parents = True,
+ cd_access="a",
+ mode=0755,
+ owner=params.hdfs_user,
+ group=params.user_group)
+ File(params.exclude_file_path,
+ content=Template("exclude_hosts_list.j2"),
+ owner=params.hdfs_user,
+ group=params.user_group)
+ elif action == "start" or action == "stop":
+ import params
+ service(
+ action=action,
+ name="secondarynamenode",
+ user=params.hdfs_user,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
+ elif action == "status":
+ import status_params
+ check_process_status(status_params.snamenode_pid_file)
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def snamenode(action=None, format=False):
+ if action == "configure":
+ pass
+ elif action == "start" or action == "stop":
+ import params
+ Service(params.snamenode_win_service_name, action=action)
+ elif action == "status":
+ import status_params
+ check_windows_service_status(status_params.snamenode_win_service_name)
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/install_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/install_params.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/install_params.py
new file mode 100644
index 0000000..fe488c3
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/install_params.py
@@ -0,0 +1,39 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from ambari_commons import OSCheck
+
+# These parameters are supposed to be referenced at installation time, before the Hadoop environment variables have been set
+if OSCheck.is_windows_family():
+ exclude_packages = []
+else:
+ from resource_management.libraries.functions.default import default
+ from resource_management.libraries.functions.get_lzo_packages import get_lzo_packages
+ from resource_management.libraries.script.script import Script
+
+ _config = Script.get_config()
+ stack_version_unformatted = str(_config['hostLevelParams']['stack_version'])
+
+ # The logic for LZO also exists in OOZIE's params.py
+ io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None)
+ lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower()
+ lzo_packages = get_lzo_packages(stack_version_unformatted)
+
+ exclude_packages = []
+ if not lzo_enabled:
+ exclude_packages += lzo_packages
http://git-wip-us.apache.org/repos/asf/ambari/blob/d845449a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode.py
new file mode 100644
index 0000000..46df454
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode.py
@@ -0,0 +1,203 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import conf_select, stack_select
+from resource_management.libraries.functions.constants import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_XML
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Directory
+from utils import service
+from hdfs import hdfs
+import journalnode_upgrade
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
+
+class JournalNode(Script):
+ def install(self, env):
+ import params
+ env.set_params(params)
+ self.install_packages(env)
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class JournalNodeDefault(JournalNode):
+
+ def get_component_name(self):
+ return "hadoop-hdfs-journalnode"
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ Logger.info("Executing Stack Upgrade pre-restart")
+ import params
+ env.set_params(params)
+
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "hadoop", params.version)
+ stack_select.select("hadoop-hdfs-journalnode", params.version)
+
+ def start(self, env, upgrade_type=None):
+ import params
+
+ env.set_params(params)
+ self.configure(env)
+ service(
+ action="start", name="journalnode", user=params.hdfs_user,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
+
+ def post_upgrade_restart(self, env, upgrade_type=None):
+ if upgrade_type == "nonrolling":
+ return
+
+ Logger.info("Executing Stack Upgrade post-restart")
+ import params
+ env.set_params(params)
+ journalnode_upgrade.post_upgrade_check()
+
+ def stop(self, env, upgrade_type=None):
+ import params
+
+ env.set_params(params)
+ service(
+ action="stop", name="journalnode", user=params.hdfs_user,
+ create_pid_dir=True,
+ create_log_dir=True
+ )
+
+ def configure(self, env):
+ import params
+
+ Directory(params.jn_edits_dir,
+ create_parents = True,
+ cd_access="a",
+ owner=params.hdfs_user,
+ group=params.user_group
+ )
+ env.set_params(params)
+ hdfs()
+ pass
+
+ def status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ check_process_status(status_params.journalnode_pid_file)
+
+ def security_status(self, env):
+ import status_params
+
+ env.set_params(status_params)
+ props_value_check = {"hadoop.security.authentication": "kerberos",
+ "hadoop.security.authorization": "true"}
+ props_empty_check = ["hadoop.security.auth_to_local"]
+ props_read_check = None
+ core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ props_value_check = None
+ props_empty_check = ['dfs.journalnode.keytab.file',
+ 'dfs.journalnode.kerberos.principal']
+ props_read_check = ['dfs.journalnode.keytab.file']
+ hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+ props_read_check)
+
+ hdfs_expectations = {}
+ hdfs_expectations.update(hdfs_site_expectations)
+ hdfs_expectations.update(core_site_expectations)
+
+ security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+ {'core-site.xml': FILE_TYPE_XML})
+ if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+ security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+ result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+ if not result_issues: # If all validations passed successfully
+ try:
+ # Double check the dict before calling execute
+ if ('hdfs-site' not in security_params or
+ 'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or
+ 'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']):
+ self.put_structured_out({"securityState": "UNSECURED"})
+ self.put_structured_out(
+ {"securityIssuesFound": "Keytab file or principal are not set property."})
+ return
+
+ cached_kinit_executor(status_params.kinit_path_local,
+ status_params.hdfs_user,
+ security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'],
+ security_params['hdfs-site']['dfs.journalnode.kerberos.principal'],
+ status_params.hostname,
+ status_params.tmp_dir)
+ self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+ except Exception as e:
+ self.put_structured_out({"securityState": "ERROR"})
+ self.put_structured_out({"securityStateErrorInfo": str(e)})
+ else:
+ issues = []
+ for cf in result_issues:
+ issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+ self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+ self.put_structured_out({"securityState": "UNSECURED"})
+ else:
+ self.put_structured_out({"securityState": "UNSECURED"})
+
+ def get_log_folder(self):
+ import params
+ return params.hdfs_log_dir
+
+ def get_user(self):
+ import params
+ return params.hdfs_user
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.journalnode_pid_file]
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class JournalNodeWindows(JournalNode):
+ def install(self, env):
+ import install_params
+ self.install_packages(env)
+
+ def start(self, env):
+ import params
+ self.configure(env)
+ Service(params.journalnode_win_service_name, action="start")
+
+ def stop(self, env):
+ import params
+ Service(params.journalnode_win_service_name, action="stop")
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ hdfs("journalnode")
+ pass
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ check_windows_service_status(status_params.journalnode_win_service_name)
+
+if __name__ == "__main__":
+ JournalNode().execute()