You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/12/15 00:46:58 UTC
ambari git commit: AMBARI-14362. Express Upgrade: Kafka broker
restart failed during EU from 2.2.9 to 2.3.2 with customized service user
accounts (Dmitry Lysnichenko via alejandro)
Repository: ambari
Updated Branches:
refs/heads/trunk 94fd36de3 -> 55be31abc
AMBARI-14362. Express Upgrade: Kafka broker restart failed during EU from 2.2.9 to 2.3.2 with customized service user accounts (Dmitry Lysnichenko via alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/55be31ab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/55be31ab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/55be31ab
Branch: refs/heads/trunk
Commit: 55be31abcb3f8f1419df3387eb93bf1f51f18e54
Parents: 94fd36d
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Mon Dec 14 15:45:02 2015 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Mon Dec 14 15:45:02 2015 -0800
----------------------------------------------------------------------
.../core/resources/system.py | 8 ++-
.../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 53 ++++++++++++++++----
.../0.8.1.2.2/package/scripts/kafka_broker.py | 5 ++
.../stacks/2.2/KAFKA/test_kafka_broker.py | 41 +++++++++++++++
4 files changed, 95 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index 83abc64..558bb2b 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -58,8 +58,12 @@ class Directory(Resource):
mode = ResourceArgument()
owner = ResourceArgument()
group = ResourceArgument()
- follow = BooleanArgument(default=True) # follow links?
- recursive = BooleanArgument(default=False) # this work for 'create', 'delete' is anyway recursive
+ follow = BooleanArgument(default=True) # follow links?
+ """
+ this works for 'create', 'delete' is anyway recursive
+ recursive means only "mkdir -p", it does NOT perform recursive chown/chmod
+ """
+ recursive = BooleanArgument(default=False)
"""
Grants x-bit for all the folders up-to the directory
http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index a8bd9d5..f02be4b 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -17,28 +17,23 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+import collections
import os
from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
from resource_management.libraries.resources.properties_file import PropertiesFile
from resource_management.libraries.resources.template_config import TemplateConfig
-from resource_management.core.resources.system import Directory, File, Link
+from resource_management.core.resources.system import Directory, Execute, File, Link
from resource_management.core.source import StaticFile, Template, InlineTemplate
from resource_management.libraries.functions import format
from resource_management.core.logger import Logger
+
def kafka(upgrade_type=None):
import params
-
- Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
- mode=0755,
- cd_access='a',
- owner=params.kafka_user,
- group=params.user_group,
- recursive=True
- )
+ ensure_base_directories()
kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
# This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
@@ -81,12 +76,14 @@ def kafka(upgrade_type=None):
kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
kafka_data_dir = kafka_server_config['log.dirs']
- Directory(filter(None,kafka_data_dir.split(",")),
+ kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+ Directory(kafka_data_dirs[:], # Todo: remove list copy when AMBARI-14373 is fixed
mode=0755,
cd_access='a',
owner=params.kafka_user,
group=params.user_group,
recursive=True)
+ set_dir_ownership(kafka_data_dirs)
PropertiesFile("server.properties",
dir=params.conf_dir,
@@ -139,6 +136,7 @@ def mutable_config_dict(kafka_broker_config):
kafka_server_config[key] = value
return kafka_server_config
+
# Used to workaround the hardcoded pid/log dir used on the kafka bash process launcher
def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
import params
@@ -172,6 +170,7 @@ def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
owner=params.kafka_user,
group=params.user_group,
recursive=True)
+ set_dir_ownership(kafka_managed_dir)
if backup_folder_path:
# Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path;
@@ -197,6 +196,7 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
group=params.user_group,
recursive=True
)
+ set_dir_ownership(backup_destination_path)
# Safely copy top-level contents to backup folder
for file in os.listdir(dir_path):
File(os.path.join(backup_destination_path, file),
@@ -204,3 +204,36 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
content = StaticFile(os.path.join(dir_path,file)))
return backup_destination_path
+
+
+def ensure_base_directories():
+ """
+ Make basic Kafka directories, and make sure that their ownership is correct
+ """
+ import params
+ base_dirs = [params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir]
+ Directory(base_dirs[:], # Todo: remove list copy when AMBARI-14373 is fixed
+ mode=0755,
+ cd_access='a',
+ owner=params.kafka_user,
+ group=params.user_group,
+ recursive=True
+ )
+ set_dir_ownership(base_dirs)
+
+
+def set_dir_ownership(targets):
+ import params
+ if isinstance(targets, collections.Iterable):
+ directories = targets
+ else: # If target is a single object, convert it to list
+ directories = [targets]
+ for directory in directories:
+ # If path is empty or a single slash,
+ # may corrupt filesystem permissions
+ if len(directory) > 1:
+ Execute(('chown', '-R', format("{kafka_user}:{user_group}"), directory),
+ sudo=True)
+ else:
+ Logger.warning("Permissions for the folder \"%s\" were not updated due to "
+ "empty path passed: " % directory)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 6c7a776..3f650bd 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -25,6 +25,7 @@ from resource_management.libraries.functions import Direction
from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
from resource_management.libraries.functions.format import format
from resource_management.libraries.functions.check_process_status import check_process_status
+from kafka import ensure_base_directories
import upgrade
from kafka import kafka
@@ -85,6 +86,10 @@ class KafkaBroker(Script):
def stop(self, env, upgrade_type=None):
import params
env.set_params(params)
+ # Kafka package scripts change permissions on folders, so we have to
+ # restore permissions after installing repo version bits
+ # before attempting to stop Kafka Broker
+ ensure_base_directories()
daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; {params.kafka_bin} stop')
Execute(daemon_cmd,
user=params.kafka_user,
http://git-wip-us.apache.org/repos/asf/ambari/blob/55be31ab/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
index 9be8198..e368f2a 100644
--- a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
+++ b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
@@ -61,6 +61,27 @@ class TestKafkaBroker(RMFTestCase):
cd_access = 'a'
)
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/var/log/kafka'),
+ sudo = True)
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/var/run/kafka'),
+ sudo = True)
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', '/usr/hdp/current/kafka-broker/config'),
+ sudo = True)
+
+ self.assertResourceCalled('Directory', '/tmp/log/dir',
+ owner = 'kafka',
+ recursive = True,
+ group = 'hadoop',
+ mode = 0755,
+ cd_access = 'a',
+ )
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/tmp/log/dir'),
+ sudo = True)
+
+
@patch("os.path.islink")
@patch("os.path.realpath")
def test_configure_custom_paths_default(self, realpath_mock, islink_mock):
@@ -97,6 +118,26 @@ class TestKafkaBroker(RMFTestCase):
cd_access = 'a'
)
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/customdisk/var/log/kafka'),
+ sudo = True)
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/customdisk/var/run/kafka'),
+ sudo = True)
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', '/usr/hdp/current/kafka-broker/config'),
+ sudo = True)
+
+ self.assertResourceCalled('Directory', '/tmp/log/dir',
+ owner = 'kafka',
+ recursive = True,
+ group = 'hadoop',
+ mode = 0755,
+ cd_access = 'a',
+ )
+
+ self.assertResourceCalled('Execute', ('chown', '-R', u'kafka:hadoop', u'/tmp/log/dir'),
+ sudo = True)
+
self.assertTrue(islink_mock.called)
self.assertTrue(realpath_mock.called)