You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/06/28 00:24:35 UTC

[37/51] [partial] ambari git commit: AMBARI-21349. Create BigInsights Stack Skeleton in Ambari 2.5 (alejandro)
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/
new file mode 100755
index 0000000..7fbb9a2
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/
@@ -0,0 +1,146 @@
+#!/usr/bin/env python
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import time
+import urllib2
+import json
+LABEL = 'Last Checkpoint: [{h} hours, {m} minutes, {tx} transactions]'
+NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}'
+NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}'
+NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}'
+NN_CHECKPOINT_TX_KEY = '{{hdfs-site/dfs.namenode.checkpoint.txns}}'
+NN_CHECKPOINT_PERIOD_KEY = '{{hdfs-site/dfs.namenode.checkpoint.period}}'
+def get_tokens():
+  """
+  Returns a tuple of tokens in the format {{site/property}} that will be used
+  to build the dictionary passed into execute
+  """
+def execute(parameters=None, host_name=None):
+  """
+  Returns a tuple containing the result code and a pre-formatted result label
+  Keyword arguments:
+  parameters (dictionary): a mapping of parameter key to value
+  host_name (string): the name of this host where the alert is running
+  """
+  if parameters is None:
+    return (('UNKNOWN', ['There were no parameters supplied to the script.']))
+  uri = None
+  scheme = 'http'
+  http_uri = None
+  https_uri = None
+  http_policy = 'HTTP_ONLY'
+  percent_warning = PERCENT_WARNING
+  percent_critical = PERCENT_CRITICAL
+  checkpoint_tx = CHECKPOINT_TX_DEFAULT
+  checkpoint_period = CHECKPOINT_PERIOD_DEFAULT
+  if NN_HTTP_ADDRESS_KEY in parameters:
+    http_uri = parameters[NN_HTTP_ADDRESS_KEY]
+  if NN_HTTPS_ADDRESS_KEY in parameters:
+    https_uri = parameters[NN_HTTPS_ADDRESS_KEY]
+  if NN_HTTP_POLICY_KEY in parameters:
+    http_policy = parameters[NN_HTTP_POLICY_KEY]
+  if NN_CHECKPOINT_TX_KEY in parameters:
+    checkpoint_tx = parameters[NN_CHECKPOINT_TX_KEY]
+  if NN_CHECKPOINT_PERIOD_KEY in parameters:
+    checkpoint_period = parameters[NN_CHECKPOINT_PERIOD_KEY]
+  # determine the right URI and whether to use SSL
+  uri = http_uri
+  if http_policy == 'HTTPS_ONLY':
+    scheme = 'https'
+    if https_uri is not None:
+      uri = https_uri
+  current_time = int(round(time.time() * 1000))
+  last_checkpoint_time_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem".format(scheme,uri)
+  journal_transaction_info_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme,uri)
+  # start out assuming an OK status
+  label = None
+  result_code = "OK"
+  try:
+    last_checkpoint_time = int(get_value_from_jmx(last_checkpoint_time_qry,"LastCheckpointTime"))
+    journal_transaction_info = get_value_from_jmx(journal_transaction_info_qry,"JournalTransactionInfo")
+    journal_transaction_info_dict = json.loads(journal_transaction_info)
+    last_tx = int(journal_transaction_info_dict['LastAppliedOrWrittenTxId'])
+    most_recent_tx = int(journal_transaction_info_dict['MostRecentCheckpointTxId'])
+    transaction_difference = last_tx - most_recent_tx
+    delta = (current_time - last_checkpoint_time)/1000
+    label = LABEL.format(h=get_time(delta)['h'], m=get_time(delta)['m'], tx=transaction_difference)
+    if (transaction_difference > int(checkpoint_tx)) and (float(delta) / int(checkpoint_period)*100 >= int(percent_critical)):
+      result_code = 'CRITICAL'
+    elif (transaction_difference > int(checkpoint_tx)) and (float(delta) / int(checkpoint_period)*100 >= int(percent_warning)):
+      result_code = 'WARNING'
+  except Exception, e:
+    label = str(e)
+    result_code = 'UNKNOWN'
+  return ((result_code, [label]))
+def get_time(delta):
+  h = int(delta/3600)
+  m = int((delta % 3600)/60)
+  return {'h':h, 'm':m}
+def get_value_from_jmx(query, jmx_property):
+  response = None
+  try:
+    response = urllib2.urlopen(query)
+    data =
+    data_dict = json.loads(data)
+    return data_dict["beans"][0][jmx_property]
+  finally:
+    if response is not None:
+      try:
+        response.close()
+      except:
+        pass
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/
new file mode 100755
index 0000000..d48f831
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/alerts/
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import urllib2
+import json
+HDFS_SITE_KEY = '{{hdfs-site}}'
+NAMESERVICE_KEY = '{{hdfs-site/dfs.nameservices}}'
+NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}'
+NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}'
+DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}'
+def get_tokens():
+  """
+  Returns a tuple of tokens in the format {{site/property}} that will be used
+  to build the dictionary passed into execute
+  """
+def execute(parameters=None, host_name=None):
+  """
+  Returns a tuple containing the result code and a pre-formatted result label
+  Keyword arguments:
+  parameters (dictionary): a mapping of parameter key to value
+  host_name (string): the name of this host where the alert is running
+  """
+  if parameters is None:
+    return (RESULT_STATE_UNKNOWN, ['There were no parameters supplied to the script.'])
+  # if not in HA mode, then SKIP
+  if not NAMESERVICE_KEY in parameters:
+    return (RESULT_STATE_SKIPPED, ['NameNode HA is not enabled'])
+  # hdfs-site is required
+  if not HDFS_SITE_KEY in parameters:
+    return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)])
+  # determine whether or not SSL is enabled
+  is_ssl_enabled = False
+  if DFS_POLICY_KEY in parameters:
+    dfs_policy = parameters[DFS_POLICY_KEY]
+    if dfs_policy == "HTTPS_ONLY":
+      is_ssl_enabled = True
+  name_service = parameters[NAMESERVICE_KEY]
+  hdfs_site = parameters[HDFS_SITE_KEY]
+  # look for
+  nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service
+  if not nn_unique_ids_key in hdfs_site:
+    return (RESULT_STATE_UNKNOWN, ['Unable to find unique namenode alias key {0}'.format(nn_unique_ids_key)])
+  namenode_http_fragment = 'dfs.namenode.http-address.{0}.{1}'
+  jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
+  if is_ssl_enabled:
+    namenode_http_fragment = 'dfs.namenode.https-address.{0}.{1}'
+    jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
+  active_namenodes = []
+  standby_namenodes = []
+  unknown_namenodes = []
+  # now we have something like 'nn1,nn2,nn3,nn4'
+  # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id]
+  # ie dfs.namenode.http-address.hacluster.nn1
+  nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',')
+  for nn_unique_id in nn_unique_ids:
+    key = namenode_http_fragment.format(name_service,nn_unique_id)
+    if key in hdfs_site:
+      # use str() to ensure that unicode strings do not have the u' in them
+      value = str(hdfs_site[key])
+      try:
+        jmx_uri = jmx_uri_fragment.format(value)
+        state = get_value_from_jmx(jmx_uri,'State')
+        if state == HDFS_NN_STATE_ACTIVE:
+          active_namenodes.append(value)
+        elif state == HDFS_NN_STATE_STANDBY:
+          standby_namenodes.append(value)
+        else:
+          unknown_namenodes.append(value)
+      except:
+        unknown_namenodes.append(value)
+  # now that the request is done, determine if this host is the host that
+  # should report the status of the HA topology
+  is_active_namenode = False
+  for active_namenode in active_namenodes:
+    if active_namenode.startswith(host_name):
+      is_active_namenode = True
+  # there's only one scenario here; there is exactly 1 active and 1 standby
+  is_topology_healthy = len(active_namenodes) == 1 and len(standby_namenodes) == 1
+  result_label = 'Active{0}, Standby{1}, Unknown{2}'.format(str(active_namenodes),
+    str(standby_namenodes), str(unknown_namenodes))
+  # Healthy Topology:
+  #   - Active NN reports the alert, standby does not
+  #
+  # Unhealthy Topology:
+  #   - Report the alert if this is the first named host
+  #   - Report the alert if not the first named host, but the other host
+  #   could not report its status
+  if is_topology_healthy:
+    if is_active_namenode is True:
+      return (RESULT_STATE_OK, [result_label])
+    else:
+      return (RESULT_STATE_SKIPPED, ['Another host will report this alert'])
+  else:
+    # dfs.namenode.rpc-address.service.alias is guaranteed in HA mode
+    first_listed_host_key = 'dfs.namenode.rpc-address.{0}.{1}'.format(
+      name_service, nn_unique_ids[0])
+    first_listed_host = ''
+    if first_listed_host_key in hdfs_site:
+      first_listed_host = hdfs_site[first_listed_host_key]
+    is_first_listed_host = False
+    if first_listed_host.startswith(host_name):
+      is_first_listed_host = True
+    if is_first_listed_host:
+      return (RESULT_STATE_CRITICAL, [result_label])
+    else:
+      # not the first listed host, but the first host might be in the unknown
+      return (RESULT_STATE_SKIPPED, ['Another host will report this alert'])
+def get_value_from_jmx(query, jmx_property):
+  response = None
+  try:
+    response = urllib2.urlopen(query)
+    data =
+    data_dict = json.loads(data)
+    return data_dict["beans"][0][jmx_property]
+  finally:
+    if response is not None:
+      try:
+        response.close()
+      except:
+        pass
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/
new file mode 100755
index 0000000..5eff79d
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/
@@ -0,0 +1,70 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+export hdfs_user=$1
+export conf_dir=$1
+export bin_dir=$1
+export old_mark_dir=$1
+export mark_dir=$1
+export name_dirs=$*
+export EXIT_CODE=0
+export command="namenode -format"
+export list_of_non_empty_dirs=""
+if [[ -f ${mark_file} ]] ; then
+  rm -f ${mark_file}
+  mkdir -p ${mark_dir}
+if [[ -d $old_mark_dir ]] ; then
+  mv ${old_mark_dir} ${mark_dir}
+if [[ ! -d $mark_dir ]] ; then
+  for dir in `echo $name_dirs | tr ',' ' '` ; do
+    echo "NameNode Dirname = $dir"
+    cmd="ls $dir | wc -l  | grep -q ^0$"
+    eval $cmd
+    if [[ $? -ne 0 ]] ; then
+      (( EXIT_CODE = $EXIT_CODE + 1 ))
+      list_of_non_empty_dirs="$list_of_non_empty_dirs $dir"
+    fi
+  done
+  if [[ $EXIT_CODE == 0 ]] ; then
+    su -s /bin/bash - ${hdfs_user} -c "export PATH=$PATH:${bin_dir} ; yes Y | hdfs --config ${conf_dir} ${command}"
+    (( EXIT_CODE = $EXIT_CODE | $? ))
+  else
+    echo "ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs ${list_of_non_empty_dirs}"
+  fi
+  echo "${mark_dir} exists. Namenode DFS already formatted"
+exit $EXIT_CODE
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/
new file mode 100755
index 0000000..6c54188
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/files/
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import optparse
+import httplib
+# Main.
+def main():
+  parser = optparse.OptionParser(usage="usage: %prog [options] component ")
+  parser.add_option("-m", "--hosts", dest="hosts", help="Comma separated hosts list for WEB UI to check it availability")
+  parser.add_option("-p", "--port", dest="port", help="Port of WEB UI to check it availability")
+  parser.add_option("-s", "--https", dest="https", help="\"True\" if value of dfs.http.policy is \"HTTPS_ONLY\"")
+  (options, args) = parser.parse_args()
+  hosts = options.hosts.split(',')
+  port = options.port
+  https = options.https
+  for host in hosts:
+    try:
+      conn = httplib.HTTPConnection(host, port) if not https.lower() == "true" else httplib.HTTPSConnection(host, port)
+      # This can be modified to get a partial url part to be sent with request
+      conn.request("GET", "/")
+      httpCode = conn.getresponse().status
+      conn.close()
+    except Exception:
+      httpCode = 404
+    if httpCode != 200:
+      if not https:
+        print "Cannot access WEB UI on: http://" + host + ":" + port if not https.lower() == "true" else "Cannot access WEB UI on: https://" + host + ":" + port
+      exit(1)
+if __name__ == "__main__":
+  main()
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/balancer-emulator/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/balancer-emulator/
new file mode 100755
index 0000000..2939fb7
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/balancer-emulator/
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import time
+import sys
+from threading import Thread
+def write_function(path, handle, interval):
+  with open(path) as f:
+      for line in f:
+          handle.write(line)
+          handle.flush()
+          time.sleep(interval)
+thread = Thread(target =  write_function, args = ('balancer.log', sys.stdout, 1.5))
+threaderr = Thread(target =  write_function, args = ('balancer-err.log', sys.stderr, 1.5 * 0.023))
+def rebalancer_out():
+  write_function('balancer.log', sys.stdout)
+def rebalancer_err():
+  write_function('balancer-err.log', sys.stdout)
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..5d1e18b
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,144 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import datanode_upgrade
+from hdfs_datanode import datanode
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management import *
+from resource_management.libraries.functions.version import compare_versions, format_stack_version
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML
+from hdfs import hdfs
+class DataNode(Script):
+  def install(self, env):
+    import params
+    self.install_packages(env)
+    env.set_params(params)
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    hdfs()
+    datanode(action="configure")
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+    datanode(action="start")
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    # pre-upgrade steps shutdown the datanode, so there's no need to call
+    # action=stop
+    if upgrade_type == "rolling":
+      datanode_upgrade.pre_upgrade_shutdown()
+    else:
+      datanode(action="stop")
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.datanode_pid_file)
+  def get_component_name(self):
+    return "hadoop-hdfs-datanode"
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+"Executing DataNode Stack Upgrade pre-restart")
+    import params
+    env.set_params(params)
+    if params.version and compare_versions(format_stack_version(params.version), '') >= 0:
+, "hadoop", params.version)
+"hadoop-hdfs-datanode", params.version)
+      #Execute(format("stack-select set hadoop-hdfs-datanode {version}"))
+  def post_upgrade_restart(self, env, upgrade_type=None):
+"Executing DataNode Stack Upgrade post-restart")
+    import params
+    env.set_params(params)
+    # ensure the DataNode has started and rejoined the cluster
+    datanode_upgrade.post_upgrade_check()
+  def security_status(self, env):
+    import status_params
+    env.set_params(status_params)
+    props_value_check = {"": "kerberos",
+                         "": "true"}
+    props_empty_check = [""]
+    props_read_check = None
+    core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    props_value_check = None
+    props_empty_check = ['dfs.datanode.keytab.file',
+                         'dfs.datanode.kerberos.principal']
+    props_read_check = ['dfs.datanode.keytab.file']
+    hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    hdfs_expectations = {}
+    hdfs_expectations.update(core_site_expectations)
+    hdfs_expectations.update(hdfs_site_expectations)
+    security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+                                                 {'core-site.xml': FILE_TYPE_XML,
+                                                  'hdfs-site.xml': FILE_TYPE_XML})
+    if 'core-site' in security_params and '' in security_params['core-site'] and \
+        security_params['core-site'][''].lower() == 'kerberos':
+      result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+      if not result_issues:  # If all validations passed successfully
+        try:
+          # Double check the dict before calling execute
+          if ('hdfs-site' not in security_params or
+                  'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or
+                  'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']):
+            self.put_structured_out({"securityState": "UNSECURED"})
+            self.put_structured_out(
+              {"securityIssuesFound": "Keytab file or principal are not set property."})
+            return
+          cached_kinit_executor(status_params.kinit_path_local,
+                                status_params.hdfs_user,
+                                security_params['hdfs-site']['dfs.datanode.keytab.file'],
+                                security_params['hdfs-site']['dfs.datanode.kerberos.principal'],
+                                status_params.hostname,
+                                status_params.tmp_dir)
+          self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+        except Exception as e:
+          self.put_structured_out({"securityState": "ERROR"})
+          self.put_structured_out({"securityStateErrorInfo": str(e)})
+      else:
+        issues = []
+        for cf in result_issues:
+          issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+        self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+        self.put_structured_out({"securityState": "UNSECURED"})
+    else:
+      self.put_structured_out({"securityState": "UNSECURED"})
+if __name__ == "__main__":
+  DataNode().execute()
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..529ca4438
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,114 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Execute
+from resource_management.core import shell
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.decorator import retry
+def pre_upgrade_shutdown():
+  """
+  Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the
+  DataNode in preparation for an upgrade. This will then periodically check
+  "getDatanodeInfo" to ensure the DataNode has shutdown correctly.
+  This function will obtain the Kerberos ticket if security is enabled.
+  :return:
+  """
+  import params
+'DataNode executing "shutdownDatanode" command in preparation for upgrade...')
+  if params.security_enabled:
+    Execute(params.dn_kinit_cmd, user = params.hdfs_user)
+  command = format('hdfs dfsadmin -shutdownDatanode {dfs_dn_ipc_address} upgrade')
+  Execute(command, user=params.hdfs_user, tries=1 )
+  # verify that the datanode is down
+  _check_datanode_shutdown()
+def post_upgrade_check():
+  """
+  Verifies that the DataNode has rejoined the cluster. This function will
+  obtain the Kerberos ticket if security is enabled.
+  :return:
+  """
+  import params
+"Checking that the DataNode has rejoined the cluster after upgrade...")
+  if params.security_enabled:
+    Execute(params.dn_kinit_cmd,user = params.hdfs_user)
+  # verify that the datanode has started and rejoined the HDFS cluster
+  _check_datanode_startup()
+@retry(times=12, sleep_time=10, err_class=Fail)
+def _check_datanode_shutdown():
+  """
+  Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
+  several times, pausing in between runs. Once the DataNode stops responding
+  this method will return, otherwise it will raise a Fail(...) and retry
+  automatically.
+  :return:
+  """
+  import params
+  command = format('hdfs dfsadmin -getDatanodeInfo {dfs_dn_ipc_address}')
+  try:
+    Execute(command, user=params.hdfs_user, tries=1)
+  except:
+"DataNode has successfully shutdown for upgrade.")
+    return
+"DataNode has not shutdown.")
+  raise Fail('DataNode has not shutdown.')
+@retry(times=12, sleep_time=10, err_class=Fail)
+def _check_datanode_startup():
+  """
+  Checks that a DataNode is reported as being alive via the
+  "hdfs dfsadmin -report -live" command. Once the DataNode is found to be
+  alive this method will return, otherwise it will raise a Fail(...) and retry
+  automatically.
+  :return:
+  """
+  import params
+  try:
+    # 'su - hdfs -c "hdfs dfsadmin -report -live"'
+    command = 'hdfs dfsadmin -report -live'
+    return_code, hdfs_output =, user=params.hdfs_user)
+  except:
+    raise Fail('Unable to determine if the DataNode has started after upgrade.')
+  if return_code == 0:
+    if params.hostname.lower() in hdfs_output.lower():
+"DataNode {0} reports that it has rejoined the cluster.".format(params.hostname))
+      return
+    else:
+      raise Fail("DataNode {0} was not found in the list of live DataNodes".format(params.hostname))
+  # return_code is not 0, fail
+  raise Fail("Unable to determine if the DataNode has started after upgrade (result code {0})".format(str(return_code)))
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..002b87d
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,129 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+Ambari Agent
+from resource_management import *
+import sys
+import os
+def hdfs(name=None):
+  import params
+  if params.create_lib_snappy_symlinks:
+    install_snappy()
+  # On some OS this folder could be not exists, so we will create it before pushing there files
+  Directory(params.limits_conf_dir,
+            create_parents=True,
+            owner='root',
+            group='root'
+  )
+  File(os.path.join(params.limits_conf_dir, 'hdfs.conf'),
+       owner='root',
+       group='root',
+       mode=0644,
+       content=Template("hdfs.conf.j2")
+  )
+  if params.security_enabled:
+    tc_mode = 0644
+    tc_owner = "root"
+  else:
+    tc_mode = None
+    tc_owner = params.hdfs_user
+  if "hadoop-policy" in params.config['configurations']:
+    XmlConfig("hadoop-policy.xml",
+              conf_dir=params.hadoop_conf_dir,
+              configurations=params.config['configurations']['hadoop-policy'],
+              configuration_attributes=params.config['configuration_attributes']['hadoop-policy'],
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+  if "ssl-client" in params.config['configurations']:
+    XmlConfig("ssl-client.xml",
+              conf_dir=params.hadoop_conf_dir,
+              configurations=params.config['configurations']['ssl-client'],
+              configuration_attributes=params.config['configuration_attributes']['ssl-client'],
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    Directory(params.hadoop_conf_secure_dir,
+              create_parents=True,
+              owner='root',
+              group=params.user_group,
+              cd_access='a',
+              )
+    XmlConfig("ssl-client.xml",
+              conf_dir=params.hadoop_conf_secure_dir,
+              configurations=params.config['configurations']['ssl-client'],
+              configuration_attributes=params.config['configuration_attributes']['ssl-client'],
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+  if "ssl-server" in params.config['configurations']:
+    XmlConfig("ssl-server.xml",
+              conf_dir=params.hadoop_conf_dir,
+              configurations=params.config['configurations']['ssl-server'],
+              configuration_attributes=params.config['configuration_attributes']['ssl-server'],
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+  XmlConfig("hdfs-site.xml",
+            conf_dir=params.hadoop_conf_dir,
+            configurations=params.config['configurations']['hdfs-site'],
+            configuration_attributes=params.config['configuration_attributes']['hdfs-site'],
+            owner=params.hdfs_user,
+            group=params.user_group
+  )
+  XmlConfig("core-site.xml",
+            conf_dir=params.hadoop_conf_dir,
+            configurations=params.config['configurations']['core-site'],
+            configuration_attributes=params.config['configuration_attributes']['core-site'],
+            owner=params.hdfs_user,
+            group=params.user_group,
+            mode=0644
+  )
+  File(os.path.join(params.hadoop_conf_dir, 'slaves'),
+       owner=tc_owner,
+       content=Template("slaves.j2")
+  )
+def install_snappy():
+  import params
+  Directory([params.so_target_dir_x86, params.so_target_dir_x64],
+            create_parents=True,
+  )
+  Link(params.so_target_x86,
+       to=params.so_src_x86,
+  )
+  Link(params.so_target_x64,
+       to=params.so_src_x64,
+  )
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..39cd1f4
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,112 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+from hdfs import hdfs
+from utils import service
+class HdfsClient(Script):
+  def get_component_name(self):
+    return "hadoop-client"
+  def install(self, env):
+    import params
+    self.install_packages(env)
+    env.set_params(params)
+    self.config(env)
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    if params.version and compare_versions(format_stack_version(params.version), '') >= 0:
+, "hadoop", params.version)
+"hadoop-client", params.version)
+  def start(self, env, upgrade_type=False):
+    import params
+    env.set_params(params)
+  def stop(self, env, upgrade_type=False):
+    import params
+    env.set_params(params)
+  def status(self, env):
+    raise ClientComponentHasNoStatus()
+  def config(self, env):
+    import params
+    hdfs()
+  def security_status(self, env):
+    import status_params
+    env.set_params(status_params)
+    props_value_check = {"": "kerberos",
+                         "": "true"}
+    props_empty_check = [""]
+    props_read_check = None
+    core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    hdfs_expectations ={}
+    hdfs_expectations.update(core_site_expectations)
+    security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+                                                   {'core-site.xml': FILE_TYPE_XML})
+    if 'core-site' in security_params and '' in security_params['core-site'] and \
+        security_params['core-site'][''].lower() == 'kerberos':
+      result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+      if not result_issues: # If all validations passed successfully
+        if status_params.hdfs_user_principal or status_params.hdfs_user_keytab:
+          try:
+            cached_kinit_executor(status_params.kinit_path_local,
+                       status_params.hdfs_user,
+                       status_params.hdfs_user_keytab,
+                       status_params.hdfs_user_principal,
+                       status_params.hostname,
+                       status_params.tmp_dir)
+            self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+          except Exception as e:
+            self.put_structured_out({"securityState": "ERROR"})
+            self.put_structured_out({"securityStateErrorInfo": str(e)})
+        else:
+          self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"})
+          self.put_structured_out({"securityState": "UNSECURED"})
+      else:
+        issues = []
+        for cf in result_issues:
+          issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+        self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+        self.put_structured_out({"securityState": "UNSECURED"})
+    else:
+      self.put_structured_out({"securityState": "UNSECURED"})
+if __name__ == "__main__":
+  HdfsClient().execute()
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..420e556
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,75 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import os
+from resource_management import *
+#from resource_management.libraries.functions.dfs_datanode_helper import handle_dfs_data_dir
+from resource_management.libraries.functions.mounted_dirs_helper import handle_mounted_dirs
+from utils import service
+def create_dirs(data_dir):
+  """
+  :param data_dir: The directory to create
+  :param params: parameters
+  """
+  import params
+  Directory(data_dir,
+            create_parents=True,
+            cd_access="a",
+            mode=0755,
+            owner=params.hdfs_user,
+            group=params.user_group,
+            ignore_failures=True
+  )
+def datanode(action=None):
+  import params
+  if action == "configure":
+    Directory(params.dfs_domain_socket_dir,
+              create_parents=True,
+              mode=0751,
+              owner=params.hdfs_user,
+              group=params.user_group)
+    # handle_mounted_dirs ensures that we don't create dfs data dirs which are temporary unavailable (unmounted), and intended to reside on a different mount.
+    data_dir_to_mount_file_content = handle_mounted_dirs(create_dirs, params.dfs_data_dirs, params.data_dir_mount_file, params)
+    # create a history file used by handle_mounted_dirs
+    File(params.data_dir_mount_file,
+         owner=params.hdfs_user,
+         group=params.user_group,
+         mode=0644,
+         content=data_dir_to_mount_file_content
+    )
+  elif action == "start" or action == "stop":
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    service(
+      action=action, name="datanode",
+      user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.datanode_pid_file)
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..f37494c
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,483 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import os.path
+import time
+from resource_management import *
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions import Direction
+from import as_user
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core import shell
+from resource_management.core.source import Template
+from resource_management.core.resources.system import File, Execute, Directory
+from resource_management.core.resources.service import Service
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from utils import get_dfsadmin_base_command
+from utils import service, safe_zkfc_op, is_previous_fs_image
+if OSCheck.is_windows_family():
+  from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+def namenode(action=None, do_format=True, upgrade_type=None, env=None):
+  import params
+  #we need this directory to be present before any action(HA manual steps for
+  #additional namenode)
+  if action == "configure":
+    create_name_dirs(params.dfs_name_dir)
+  if action == "start":
+    if do_format:
+      format_namenode()
+      pass
+    File(params.exclude_file_path,
+         content=Template("exclude_hosts_list.j2"),
+         owner=params.hdfs_user,
+         group=params.user_group
+    )
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    if params.dfs_ha_enabled and \
+      params.dfs_ha_namenode_standby is not None and \
+      params.hostname == params.dfs_ha_namenode_standby:
+        # if the current host is the standby NameNode in an HA deployment
+        # run the bootstrap command, to start the NameNode in standby mode
+        # this requires that the active NameNode is already up and running,
+        # so this execute should be re-tried upon failure, up to a timeout
+        success = bootstrap_standby_namenode(params)
+        if not success:
+          raise Fail("Could not bootstrap standby namenode")
+    if upgrade_type == "rolling":
+      # Must start Zookeeper Failover Controller if it exists on this host because it could have been killed in order to initiate the failover.
+      safe_zkfc_op(action, env)
+    #options = "-rollingUpgrade started" if rolling_restart else ""
+    options = ""
+    if upgrade_type == "rolling":
+      if params.upgrade_direction == Direction.UPGRADE:
+        options = "-rollingUpgrade started"
+      elif params.upgrade_direction == Direction.DOWNGRADE:
+        options = "-rollingUpgrade downgrade"
+    elif upgrade_type == "nonrolling":
+      is_previous_image_dir = is_previous_fs_image()
+"Previous file system image dir present is {is_previous_image_dir}"))
+      if params.upgrade_direction == Direction.UPGRADE:
+        options = "-rollingUpgrade started"
+      elif params.upgrade_direction == Direction.DOWNGRADE:
+        options = "-rollingUpgrade downgrade"
+"Option for start command: {options}"))
+    service(
+      action="start",
+      name="namenode",
+      user=params.hdfs_user,
+      options=options,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+    if params.security_enabled:
+      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+              user = params.hdfs_user)
+    if params.dfs_ha_enabled:
+      is_active_namenode_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    else:
+      is_active_namenode_cmd = True
+    # During NonRolling Upgrade, both NameNodes are initially down,
+    # so no point in checking if this is the active or standby.
+    if upgrade_type == "nonrolling":
+      is_active_namenode_cmd = False
+    # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____|
+    # no-HA                 | ON -> OFF                | Yes                      |
+    # HA and active         | ON -> OFF                | Yes                      |
+    # HA and standby        | no change                | no check                 |
+    # RU with HA on active  | ON -> OFF                | Yes                      |
+    # RU with HA on standby | ON -> OFF                | Yes                      |
+    # EU with HA on active  | no change                | no check                 |
+    # EU with HA on standby | no change                | no check                 |
+    # EU non-HA             | no change                | no check                 |
+    check_for_safemode_off = False
+    msg = ""
+    if params.dfs_ha_enabled:
+      if upgrade_type is not None:
+        check_for_safemode_off = True
+        msg = "Must wait to leave safemode since High Availability is enabled during a Stack Upgrade"
+      else:
+"Wait for NameNode to become active.")
+        if is_active_namenode(): # active
+          check_for_safemode_off = True
+          msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
+        else:
+          msg = "Will remain in the current safemode state."
+    else:
+      msg = "Must wait to leave safemode since High Availability is not enabled."
+      check_for_safemode_off = True
+    # During a NonRolling (aka Express Upgrade), stay in safemode since the DataNodes are down.
+    stay_in_safe_mode = False
+    if upgrade_type == "nonrolling":
+      stay_in_safe_mode = True
+    if check_for_safemode_off:
+"Stay in safe mode: {0}".format(stay_in_safe_mode))
+      if not stay_in_safe_mode:
+        wait_for_safemode_off()
+    # Always run this on non-HA, or active NameNode during HA.
+    create_hdfs_directories(is_active_namenode_cmd)
+    '''if params.dfs_ha_enabled:
+      dfs_check_nn_status_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    else:
+      dfs_check_nn_status_cmd = None
+    namenode_safe_mode_off = format("hdfs dfsadmin -fs {namenode_address} -safemode get | grep 'Safe mode is OFF'")
+    # If HA is enabled and it is in standby, then stay in safemode, otherwise, leave safemode.
+    leave_safe_mode = True
+    if dfs_check_nn_status_cmd is not None:
+      code, out = # If active NN, code will be 0
+      if code != 0:
+        leave_safe_mode = False
+    if leave_safe_mode:
+      # First check if Namenode is not in 'safemode OFF' (equivalent to safemode ON), if so, then leave it
+      code, out =
+      if code != 0:
+        leave_safe_mode_cmd = format("hdfs --config {hadoop_conf_dir} dfsadmin -fs {namenode_address} -safemode leave")
+        Execute(leave_safe_mode_cmd,
+                tries=10,
+                try_sleep=10,
+                user=params.hdfs_user,
+                path=[params.hadoop_bin_dir],
+        )
+    # Verify if Namenode should be in safemode OFF
+    Execute(namenode_safe_mode_off,
+            tries=40,
+            try_sleep=10,
+            path=[params.hadoop_bin_dir],
+            user=params.hdfs_user,
+            only_if=dfs_check_nn_status_cmd #skip when HA not active
+    )
+    create_hdfs_directories(dfs_check_nn_status_cmd)'''
+  if action == "stop":
+    service(
+      action="stop", name="namenode",
+      user=params.hdfs_user
+    )
+  if action == "decommission":
+    decommission()
+def namenode(action=None, do_format=True, upgrade_type=None, env=None):
+  if action == "configure":
+    pass
+  elif action == "start":
+    import params
+    #TODO: Replace with format_namenode()
+    namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED")
+    if not os.path.exists(namenode_format_marker):
+      hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
+      Execute("%s namenode -format" % (hadoop_cmd))
+      open(namenode_format_marker, 'a').close()
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "stop":
+    import params
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "status":
+    import status_params
+    check_windows_service_status(status_params.namenode_win_service_name)
+  elif action == "decommission":
+    decommission()
+def create_name_dirs(directories):
+  import params
+  dirs = directories.split(",")
+  Directory(dirs,
+            mode=0755,
+            owner=params.hdfs_user,
+            group=params.user_group,
+            create_parents=True,
+            cd_access="a",
+  )
+def create_hdfs_directories(check):
+  import params
+  params.HdfsResource("/tmp",
+                       type="directory",
+                       action="create_on_execute",
+                       owner=params.hdfs_user,
+                       mode=0777,
+                       only_if=check
+  )
+  params.HdfsResource(params.smoke_hdfs_user_dir,
+                       type="directory",
+                       action="create_on_execute",
+                       owner=params.smoke_user,
+                       mode=params.smoke_hdfs_user_mode,
+                       only_if=check
+  )
+  params.HdfsResource(None,
+                      action="execute",
+                      only_if=check #skip creation when HA not active
+  )
+def format_namenode(force=None):
+  import params
+  old_mark_dir = params.namenode_formatted_old_mark_dirs
+  mark_dir = params.namenode_formatted_mark_dirs
+  dfs_name_dir = params.dfs_name_dir
+  hdfs_user = params.hdfs_user
+  hadoop_conf_dir = params.hadoop_conf_dir
+  if not params.dfs_ha_enabled:
+    if force:
+      ExecuteHadoop('namenode -format',
+                    kinit_override=True,
+                    bin_dir=params.hadoop_bin_dir,
+                    conf_dir=hadoop_conf_dir)
+    else:
+      if not is_namenode_formatted(params):
+        Execute(format("yes Y | hdfs --config {hadoop_conf_dir} namenode -format"),
+                user = params.hdfs_user,
+                path = [params.hadoop_bin_dir]
+        )
+        for m_dir in mark_dir:
+          Directory(m_dir,
+            create_parents = True
+          )
+  else:
+    if params.dfs_ha_namenode_active is not None and \
+       params.hostname == params.dfs_ha_namenode_active:
+      # check and run the format command in the HA deployment scenario
+      # only format the "active" namenode in an HA deployment
+      if force:
+        ExecuteHadoop('namenode -format',
+                      kinit_override=True,
+                      bin_dir=params.hadoop_bin_dir,
+                      conf_dir=hadoop_conf_dir)
+      else:
+        if not is_namenode_formatted(params):
+          Execute(format("yes Y | hdfs --config {hadoop_conf_dir} namenode -format"),
+                  user = params.hdfs_user,
+                  path = [params.hadoop_bin_dir]
+          )
+          for m_dir in mark_dir:
+            Directory(m_dir,
+              create_parents = True
+            )
+def is_namenode_formatted(params):
+  old_mark_dirs = params.namenode_formatted_old_mark_dirs
+  mark_dirs = params.namenode_formatted_mark_dirs
+  nn_name_dirs = params.dfs_name_dir.split(',')
+  marked = False
+  # Check if name directories have been marked as formatted
+  for mark_dir in mark_dirs:
+    if os.path.isdir(mark_dir):
+      marked = True
+      print format("{mark_dir} exists. Namenode DFS already formatted")
+  # Ensure that all mark dirs created for all name directories
+  if marked:
+    for mark_dir in mark_dirs:
+      Directory(mark_dir,
+        create_parents = True
+      )
+    return marked
+  # Move all old format markers to new place
+  for old_mark_dir in old_mark_dirs:
+    if os.path.isdir(old_mark_dir):
+      for mark_dir in mark_dirs:
+        Execute(('cp', '-ar', old_mark_dir, mark_dir),
+                sudo = True
+        )
+        marked = True
+      Directory(old_mark_dir,
+        action = "delete"
+      )
+    elif os.path.isfile(old_mark_dir):
+      for mark_dir in mark_dirs:
+        Directory(mark_dir,
+                  create_parents = True,
+        )
+      Directory(old_mark_dir,
+        action = "delete"
+      )
+      marked = True
+  # Check if name dirs are not empty
+  for name_dir in nn_name_dirs:
+    try:
+      Execute(format("ls {name_dir} | wc -l  | grep -q ^0$"),
+      )
+      marked = False
+    except Exception:
+      marked = True
+      print format("ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs {nn_name_dirs}")
+      break
+  return marked
+def decommission():
+  import params
+  hdfs_user = params.hdfs_user
+  conf_dir = params.hadoop_conf_dir
+  user_group = params.user_group
+  nn_kinit_cmd = params.nn_kinit_cmd
+  File(params.exclude_file_path,
+       content=Template("exclude_hosts_list.j2"),
+       owner=hdfs_user,
+       group=user_group
+  )
+  if not params.update_exclude_file_only:
+    Execute(nn_kinit_cmd,
+            user=hdfs_user
+    )
+    if params.dfs_ha_enabled:
+      # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
+      # need to execute each command scoped to a particular namenode
+      nn_refresh_cmd = format('dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
+    else:
+      nn_refresh_cmd = format('dfsadmin -refreshNodes')
+    ExecuteHadoop(nn_refresh_cmd,
+                  user=hdfs_user,
+                  conf_dir=conf_dir,
+                  kinit_override=True,
+                  bin_dir=params.hadoop_bin_dir)
+def bootstrap_standby_namenode(params):
+  try:
+    iterations = 50
+    bootstrap_cmd = "hdfs namenode -bootstrapStandby -nonInteractive"
+"Boostrapping standby namenode: %s" % (bootstrap_cmd))
+    for i in range(iterations):
+'Try %d out of %d' % (i+1, iterations))
+      code, out =, logoutput=False, user=params.hdfs_user)
+      if code == 0:
+"Standby namenode bootstrapped successfully")
+        return True
+      elif code == 5:
+"Standby namenode already bootstrapped")
+        return True
+      else:
+        Logger.warning('Bootstrap standby namenode failed with %d error code. Will retry' % (code))
+  except Exception as ex:
+    Logger.error('Bootstrap standby namenode threw an exception. Reason %s' %(str(ex)))
+  return False
+def is_active_namenode():
+  """
+  Checks if current NameNode is active. Waits up to 30 seconds. If other NameNode is active returns False.
+  :return: True if current NameNode is active, False otherwise
+  """
+  import params
+  if params.dfs_ha_enabled:
+    is_active_this_namenode_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    is_active_other_namenode_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {other_namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    for i in range(0, 5):
+      code, out = # If active NN, code will be 0
+      if code == 0: # active
+        return True
+      code, out = # If other NN is active, code will be 0
+      if code == 0: # other NN is active
+        return False
+      if i < 4: # Do not sleep after last iteration
+        time.sleep(6)
+"Active NameNode is not found.")
+    return False
+  else:
+    return True
+def wait_for_safemode_off(afterwait_sleep=0, execute_kinit=False):
+  """
+  During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
+  all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
+  If HA is present, then this command will run individually on each NameNode, which checks for its own address.
+  """
+  import params
+"Wait to leafe safemode since must transition from ON to OFF.")
+  if params.security_enabled and execute_kinit:
+    kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
+    Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+  try:
+    # Note, this fails if namenode_address isn't prefixed with "params."
+    dfsadmin_base_command = get_dfsadmin_base_command('hdfs', use_specific_namenode=True)
+    is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'"
+    # Wait up to 30 mins
+    Execute(is_namenode_safe_mode_off,
+            tries=115,
+            try_sleep=10,
+            user=params.hdfs_user,
+            logoutput=True
+            )
+    # Wait a bit more since YARN still depends on block reports coming in.
+    # Also saw intermittent errors with HBASE service check if it was done too soon.
+    time.sleep(afterwait_sleep)
+  except Fail:
+    Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..efebfc5
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,72 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core.resources import Directory
+from resource_management.core import shell
+from utils import service
+import subprocess,os
+# NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
+# on Linux such as CentOS6.2.
+def prepare_rpcbind():
+"check if native nfs server is running")
+  p, output ="pgrep nfsd")
+  if p == 0 :
+"native nfs server is running. shutting it down...")
+    # shutdown nfs
+"service nfs stop")
+"service nfs-kernel-server stop")
+"check if the native nfs server is down...")
+    p, output ="pgrep nfsd")
+    if p == 0 :
+      raise Fail("Failed to shutdown native nfs service")
+"check if rpcbind or portmap is running")
+  p, output ="pgrep rpcbind")
+  q, output ="pgrep portmap")
+  if p!=0 and q!=0 :
+"no portmap or rpcbind running. starting one...")
+    p, output ="service rpcbind start")
+    q, output ="service portmap start")
+    if p!=0 and q!=0 :
+      raise Fail("Failed to start rpcbind or portmap")
+"now we are ready to start nfs gateway")
+def nfsgateway(action=None, format=False):
+  import params
+  if action== "start":
+    prepare_rpcbind()
+  if action == "configure":
+    return
+  elif action == "start" or action == "stop":
+    service(
+      action=action,
+      name="nfs3",
+      user=params.root_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..aea6fce
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import re
+class HdfsParser():
+  def __init__(self):
+    self.initialLine = None
+    self.state = None
+  def parseLine(self, line):
+    hdfsLine = HdfsLine()
+    type, matcher = hdfsLine.recognizeType(line)
+    if(type == HdfsLine.LineType.HeaderStart):
+      self.state = 'PROCESS_STARTED'
+    elif (type == HdfsLine.LineType.Progress):
+      self.state = 'PROGRESS'
+      hdfsLine.parseProgressLog(line, matcher)
+      if(self.initialLine == None): self.initialLine = hdfsLine
+      return hdfsLine
+    elif (type == HdfsLine.LineType.ProgressEnd):
+      self.state = 'PROCESS_FINISED'
+    return None
+class HdfsLine():
+  class LineType:
+    HeaderStart, Progress, ProgressEnd, Unknown = range(4)
+  MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB']
+  MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))'
+  HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved')
+  PROGRESS_PATTERN = re.compile(
+                            "(?P<date>.*?)\s+" +
+                            "(?P<iteration>\d+)\s+" +
+                            MEMORY_PATTERN % (1,1,1) + "\s+" +
+                            MEMORY_PATTERN % (2,2,2) + "\s+" +
+                            MEMORY_PATTERN % (3,3,3)
+                            )
+  PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)')
+  def __init__(self):
+ = None
+    self.iteration = None
+    self.bytesAlreadyMoved = None
+    self.bytesLeftToMove = None
+    self.bytesBeingMoved = None
+    self.bytesAlreadyMovedStr = None
+    self.bytesLeftToMoveStr = None
+    self.bytesBeingMovedStr = None
+  def recognizeType(self, line):
+    for (type, pattern) in (
+                            (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN),
+                            (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN),
+                            (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN)
+                            ):
+      m = re.match(pattern, line)
+      if m:
+        return type, m
+    return HdfsLine.LineType.Unknown, None
+  def parseProgressLog(self, line, m):
+    '''
+    Parse the line of 'hdfs rebalancer' output. The example output being parsed:
+    Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
+    Jul 28, 2014 5:01:49 PM           0                  0 B             5.74 GB            9.79 GB
+    Jul 28, 2014 5:03:00 PM           1                  0 B             5.58 GB            9.79 GB
+    Throws AmbariException in case of parsing errors
+    '''
+    m = re.match(self.PROGRESS_PATTERN, line)
+    if m:
+ ='date')
+      self.iteration = int('iteration'))
+      self.bytesAlreadyMoved = self.parseMemory('memory_1'),'mult_1'))
+      self.bytesLeftToMove = self.parseMemory('memory_2'),'mult_2'))
+      self.bytesBeingMoved = self.parseMemory('memory_3'),'mult_3'))
+      self.bytesAlreadyMovedStr ='memmult_1')
+      self.bytesLeftToMoveStr ='memmult_2')
+      self.bytesBeingMovedStr ='memmult_3')
+    else:
+      raise AmbariException("Failed to parse line [%s]")
+  def parseMemory(self, memorySize, multiplier_type):
+    try:
+      factor = self.MEMORY_SUFFIX.index(multiplier_type)
+    except ValueError:
+      raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type))
+    return float(memorySize) * (1024 ** factor)
+  def toJson(self):
+    return {
+            'timeStamp' :,
+            'iteration' : self.iteration,
+            'dataMoved': self.bytesAlreadyMovedStr,
+            'dataLeft' : self.bytesLeftToMoveStr,
+            'dataBeingMoved': self.bytesBeingMovedStr,
+            'bytesMoved': self.bytesAlreadyMoved,
+            'bytesLeft' : self.bytesLeftToMove,
+            'bytesBeingMoved': self.bytesBeingMoved,
+          }
+  def __str__(self):
+    return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved)
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..3135924
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,50 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+from resource_management import *
+from utils import service
+def snamenode(action=None, format=False):
+  import params
+  if action == "configure":
+    for fs_checkpoint_dir in params.fs_checkpoint_dirs:
+      Directory(fs_checkpoint_dir,
+                create_parents=True,
+                cd_access="a",
+                mode=0755,
+                owner=params.hdfs_user,
+                group=params.user_group)
+    File(params.exclude_file_path,
+         content=Template("exclude_hosts_list.j2"),
+         owner=params.hdfs_user,
+         group=params.user_group)
+  elif action == "start" or action == "stop":
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    service(
+      action=action,
+      name="secondarynamenode",
+      user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..1bbbd50
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,169 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions.version import compare_versions, \
+  format_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+from utils import service
+from hdfs import hdfs
+import journalnode_upgrade
+class JournalNode(Script):
+  def get_component_name(self):
+    return "hadoop-hdfs-journalnode"
+  def install(self, env):
+    import params
+    self.install_packages(env)
+    env.set_params(params)
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+"Executing Stack Upgrade pre-restart")
+    import params
+    env.set_params(params)
+    if params.version and compare_versions(format_stack_version(params.version), '') >= 0:
+, "hadoop", params.version)
+"hadoop-hdfs-journalnode", params.version)
+      #Execute(format("stack-select set hadoop-hdfs-journalnode {version}"))
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    service(
+      action="start", name="journalnode", user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    if upgrade_type == "nonrolling":
+      return
+"Executing Stack Upgrade post-restart")
+    import params
+    env.set_params(params)
+    journalnode_upgrade.post_upgrade_check()
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    service(
+      action="stop", name="journalnode", user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+  def configure(self, env):
+    import params
+    Directory(params.jn_edits_dir,
+              create_parents=True,
+              cd_access="a",
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    env.set_params(params)
+    hdfs()
+    pass
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.journalnode_pid_file)
+  def security_status(self, env):
+    import status_params
+    env.set_params(status_params)
+    props_value_check = {"": "kerberos",
+                         "": "true"}
+    props_empty_check = [""]
+    props_read_check = None
+    core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    props_value_check = None
+    props_empty_check = ['dfs.journalnode.keytab.file',
+                         'dfs.journalnode.kerberos.principal']
+    props_read_check = ['dfs.journalnode.keytab.file']
+    hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    hdfs_expectations = {}
+    hdfs_expectations.update(hdfs_site_expectations)
+    hdfs_expectations.update(core_site_expectations)
+    security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+                                                 {'core-site.xml': FILE_TYPE_XML})
+    if 'core-site' in security_params and '' in security_params['core-site'] and \
+        security_params['core-site'][''].lower() == 'kerberos':
+      result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+      if not result_issues:  # If all validations passed successfully
+        try:
+          # Double check the dict before calling execute
+          if ('hdfs-site' not in security_params or
+                  'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or
+                  'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']):
+            self.put_structured_out({"securityState": "UNSECURED"})
+            self.put_structured_out(
+              {"securityIssuesFound": "Keytab file or principal are not set property."})
+            return
+          cached_kinit_executor(status_params.kinit_path_local,
+                                status_params.hdfs_user,
+                                security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'],
+                                security_params['hdfs-site']['dfs.journalnode.kerberos.principal'],
+                                status_params.hostname,
+                                status_params.tmp_dir)
+          self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+        except Exception as e:
+          self.put_structured_out({"securityState": "ERROR"})
+          self.put_structured_out({"securityStateErrorInfo": str(e)})
+      else:
+        issues = []
+        for cf in result_issues:
+          issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+        self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+        self.put_structured_out({"securityState": "UNSECURED"})
+    else:
+      self.put_structured_out({"securityState": "UNSECURED"})
+if __name__ == "__main__":
+  JournalNode().execute()
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
new file mode 100755
index 0000000..91d93a6
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/
@@ -0,0 +1,136 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+import time
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.default import default
+from resource_management.core.exceptions import Fail
+from utils import get_jmx_data
+from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
+def post_upgrade_check():
+  """
+  Ensure all journal nodes are up and quorum is established
+  :return:
+  """
+  import params
+"Ensuring Journalnode quorum is established")
+  if params.security_enabled:
+    Execute(params.jn_kinit_cmd, user=params.hdfs_user)
+  time.sleep(5)
+  hdfs_roll_edits()
+  time.sleep(5)
+  all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", [])
+  if len(all_journal_node_hosts) < 3:
+    raise Fail("Need at least 3 Journalnodes to maintain a quorum")
+  try:
+    namenode_ha = NamenodeHAState()
+  except ValueError, err:
+    raise Fail("Could not retrieve Namenode HA addresses. Error: " + str(err))
+  nn_address = namenode_ha.get_address(NAMENODE_STATE.ACTIVE)
+  nn_data = get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo',
+                         namenode_ha.is_encrypted())
+  if not nn_data:
+    raise Fail("Could not retrieve JournalTransactionInfo from JMX")
+  try:
+    last_txn_id = int(nn_data['LastAppliedOrWrittenTxId'])
+    success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id)
+    if not success:
+      raise Fail("Could not ensure that all Journal nodes have a new log transaction id")
+  except KeyError:
+    raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info")
+def hdfs_roll_edits():
+  """
+  HDFS_CLIENT needs to be a dependency of JOURNALNODE
+  Roll the logs so that Namenode will be able to connect to the Journalnode.
+  Must kinit before calling this command.
+  """
+  import params
+  # TODO, this will be to be doc'ed since existing IOP 4.0 clusters will needs HDFS_CLIENT on all JOURNALNODE hosts
+  command = 'hdfs dfsadmin -rollEdits'
+  Execute(command, user=params.hdfs_user, tries=1)
+def ensure_jns_have_new_txn(nodes, last_txn_id):
+  """
+  :param nodes: List of Journalnodes
+  :param last_txn_id: Integer of last transaction id
+  :return: Return true on success, false otherwise
+  """
+  import params
+  num_of_jns = len(nodes)
+  actual_txn_ids = {}
+  jns_updated = 0
+  if params.journalnode_address is None:
+    raise Fail("Could not retrieve Journal node address")
+  if params.journalnode_port is None:
+    raise Fail("Could not retrieve Journalnode port")
+  time_out_secs = 3 * 60
+  step_time_secs = 10
+  iterations = int(time_out_secs/step_time_secs)
+  protocol = "https" if params.https_only else "http"
+"Checking if all Journalnodes are updated.")
+  for i in range(iterations):
+'Try %d out of %d' % (i+1, iterations))
+    for node in nodes:
+      # if all JNS are updated break
+      if jns_updated == num_of_jns:
+"All journal nodes are updated")
+        return True
+      # JN already meets condition, skip it
+      if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id:
+        continue
+      url = '%s://%s:%s' % (protocol, node, params.journalnode_port)
+      data = get_jmx_data(url, 'Journal-', 'LastWrittenTxId')
+      if data:
+        actual_txn_ids[node] = int(data)
+        if actual_txn_ids[node] >= last_txn_id:
+"Journalnode %s has a higher transaction id: %s" % (node, str(data)))
+          jns_updated += 1
+        else:
+"Journalnode %s is still on transaction id: %s" % (node, str(data)))
+"Sleeping for %d secs" % step_time_secs)
+    time.sleep(step_time_secs)
+  return jns_updated == num_of_jns
\ No newline at end of file