You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/12/15 00:46:58 UTC

ambari git commit: AMBARI-14362. Express Upgrade: Kafka broker restart failed during EU from 2.2.9 to 2.3.2 with customized service user accounts (Dmitry Lysnichenko via alejandro)

Repository: ambari
Updated Branches:
  refs/heads/trunk 94fd36de3 -> 55be31abc


AMBARI-14362. Express Upgrade: Kafka broker restart failed during EU from 2.2.9 to 2.3.2 with customized service user accounts (Dmitry Lysnichenko via alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/55be31ab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/55be31ab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/55be31ab

Branch: refs/heads/trunk
Commit: 55be31abcb3f8f1419df3387eb93bf1f51f18e54
Parents: 94fd36d
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Mon Dec 14 15:45:02 2015 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Mon Dec 14 15:45:02 2015 -0800

----------------------------------------------------------------------
 .../core/resources/system.py                    |  8 ++-
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    | 53 ++++++++++++++++----
 .../0.8.1.2.2/package/scripts/kafka_broker.py   |  5 ++
 .../stacks/2.2/KAFKA/test_kafka_broker.py       | 41 +++++++++++++++
 4 files changed, 95 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index 83abc64..558bb2b 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -58,8 +58,12 @@ class Directory(Resource):
   mode = ResourceArgument()
   owner = ResourceArgument()
   group = ResourceArgument()
-  follow = BooleanArgument(default=True) # follow links?
-  recursive = BooleanArgument(default=False) # this work for 'create', 'delete' is anyway recursive
+  follow = BooleanArgument(default=True)  # follow links?
+  """
+  this works for 'create', 'delete' is anyway recursive
+  recursive means only "mkdir -p", it does NOT perform recursive chown/chmod
+  """
+  recursive = BooleanArgument(default=False)
   """
   Grants x-bit for all the folders up-to the directory
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index a8bd9d5..f02be4b 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -17,28 +17,23 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+import collections
 import os
 
 from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
 from resource_management.libraries.resources.properties_file import PropertiesFile
 from resource_management.libraries.resources.template_config import TemplateConfig
-from resource_management.core.resources.system import Directory, File, Link
+from resource_management.core.resources.system import Directory, Execute, File, Link
 from resource_management.core.source import StaticFile, Template, InlineTemplate
 from resource_management.libraries.functions import format
 
 
 from resource_management.core.logger import Logger
 
+
 def kafka(upgrade_type=None):
     import params
-
-    Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
-              mode=0755,
-              cd_access='a',
-              owner=params.kafka_user,
-              group=params.user_group,
-              recursive=True
-          )
+    ensure_base_directories()
 
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
@@ -81,12 +76,14 @@ def kafka(upgrade_type=None):
       kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
 
     kafka_data_dir = kafka_server_config['log.dirs']
-    Directory(filter(None,kafka_data_dir.split(",")),
+    kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+    Directory(kafka_data_dirs[:],  # Todo: remove list copy when AMBARI-14373 is fixed
               mode=0755,
               cd_access='a',
               owner=params.kafka_user,
               group=params.user_group,
               recursive=True)
+    set_dir_ownership(kafka_data_dirs)
 
     PropertiesFile("server.properties",
                       dir=params.conf_dir,
@@ -139,6 +136,7 @@ def mutable_config_dict(kafka_broker_config):
         kafka_server_config[key] = value
     return kafka_server_config
 
+
 # Used to workaround the hardcoded pid/log dir used on the kafka bash process launcher
 def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
   import params
@@ -172,6 +170,7 @@ def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
               owner=params.kafka_user,
               group=params.user_group,
               recursive=True)
+    set_dir_ownership(kafka_managed_dir)
 
   if backup_folder_path:
     # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path;
@@ -197,6 +196,7 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
             group=params.user_group,
             recursive=True
   )
+  set_dir_ownership(backup_destination_path)
   # Safely copy top-level contents to backup folder
   for file in os.listdir(dir_path):
     File(os.path.join(backup_destination_path, file),
@@ -204,3 +204,36 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
          content = StaticFile(os.path.join(dir_path,file)))
 
   return backup_destination_path
+
+
+def ensure_base_directories():
+  """
+  Make basic Kafka directories, and make sure that their ownership is correct
+  """
+  import params
+  base_dirs = [params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir]
+  Directory(base_dirs[:],  # Todo: remove list copy when AMBARI-14373 is fixed
+            mode=0755,
+            cd_access='a',
+            owner=params.kafka_user,
+            group=params.user_group,
+            recursive=True
+            )
+  set_dir_ownership(base_dirs)
+
+
+def set_dir_ownership(targets):
+  import params
+  if isinstance(targets, collections.Iterable):
+    directories = targets
+  else:  # If target is a single object, convert it to list
+    directories = [targets]
+  for directory in directories:
+    # If path is empty or a single slash,
+    # may corrupt filesystem permissions
+    if len(directory) > 1:
+      Execute(('chown', '-R', format("{kafka_user}:{user_group}"), directory),
+            sudo=True)
+    else:
+      Logger.warning("Permissions for the folder \"%s\" were not updated due to "
+            "empty path passed: " % directory)

http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 6c7a776..3f650bd 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -25,6 +25,7 @@ from resource_management.libraries.functions import Direction
 from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.check_process_status import check_process_status
+from kafka import ensure_base_directories
 
 import upgrade
 from kafka import kafka
@@ -85,6 +86,10 @@ class KafkaBroker(Script):
   def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
+    # Kafka package scripts change permissions on folders, so we have to
+    # restore permissions after installing repo version bits
+    # before attempting to stop Kafka Broker
+    ensure_base_directories()
     daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; {params.kafka_bin} stop')
     Execute(daemon_cmd,
             user=params.kafka_user,

http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
index 9be8198..e368f2a 100644
--- a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
+++ b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
@@ -61,6 +61,27 @@ class TestKafkaBroker(RMFTestCase):
                               cd_access = 'a'
     )
 
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/var/log/kafka'),
+                              sudo = True)
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/var/run/kafka'),
+                              sudo = True)
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', '/usr/hdp/current/kafka-broker/config'),
+                              sudo = True)
+
+    self.assertResourceCalled('Directory', '/tmp/log/dir',
+                              owner = 'kafka',
+                              recursive = True,
+                              group = 'hadoop',
+                              mode = 0755,
+                              cd_access = 'a',
+    )
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/tmp/log/dir'),
+                              sudo = True)
+
+
   @patch("os.path.islink")
   @patch("os.path.realpath")
   def test_configure_custom_paths_default(self, realpath_mock, islink_mock):
@@ -97,6 +118,26 @@ class TestKafkaBroker(RMFTestCase):
                               cd_access = 'a'
     )
 
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/customdisk/var/log/kafka'),
+                              sudo = True)
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/customdisk/var/run/kafka'),
+                              sudo = True)
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', '/usr/hdp/current/kafka-broker/config'),
+                              sudo = True)
+
+    self.assertResourceCalled('Directory', '/tmp/log/dir',
+                              owner = 'kafka',
+                              recursive = True,
+                              group = 'hadoop',
+                              mode = 0755,
+                              cd_access = 'a',
+    )
+
+    self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/tmp/log/dir'),
+                              sudo = True)
+
     self.assertTrue(islink_mock.called)
     self.assertTrue(realpath_mock.called)