You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/12/02 19:35:12 UTC
ambari git commit: AMBARI-14147. RU: Kafka broker restart failed on
downgrade from HDP 2.3 to 2.2 (alejandro)
Repository: ambari
Updated Branches:
refs/heads/trunk 2a970f8de -> 4b57a0f8a
AMBARI-14147. RU: Kafka broker restart failed on downgrade from HDP 2.3 to 2.2 (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4b57a0f8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4b57a0f8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4b57a0f8
Branch: refs/heads/trunk
Commit: 4b57a0f8aa7329526b8c53bd7ad60f73cc19649d
Parents: 2a970f8
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Dec 1 16:18:24 2015 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Wed Dec 2 10:35:05 2015 -0800
----------------------------------------------------------------------
.../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 68 +++++++++++++-------
.../0.8.1.2.2/package/scripts/kafka_broker.py | 8 +--
.../HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml | 3 +-
.../HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml | 3 +-
.../HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml | 1 -
.../HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml | 3 +-
6 files changed, 50 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 2bd93d2..a8bd9d5 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -17,13 +17,19 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
-from resource_management import *
+from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
from resource_management.libraries.resources.properties_file import PropertiesFile
from resource_management.libraries.resources.template_config import TemplateConfig
-import os
+from resource_management.core.resources.system import Directory, File, Link
+from resource_management.core.source import StaticFile, Template, InlineTemplate
+from resource_management.libraries.functions import format
+
+
+from resource_management.core.logger import Logger
-def kafka():
+def kafka(upgrade_type=None):
import params
Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
@@ -35,32 +41,44 @@ def kafka():
)
kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
- # This still has an issue of out of alphabetical order of hostnames can get out of order broker.id assigned to them for HDP-2.2.
- # Since from HDP-2.3 kafka is handling the generation of broker.id ambari doesn't need to generate one.
- if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2.0.0') >= 0 and compare_versions(params.hdp_stack_version, '2.3.0.0') < 0 :
+ # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
+ # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari doesn't have to.
+
+ effective_version = params.hdp_stack_version if upgrade_type is None else format_hdp_stack_version(params.version)
+ Logger.info(format("Effective stack version: {effective_version}"))
+
+ if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.2.0.0') >= 0 and compare_versions(effective_version, '2.3.0.0') < 0:
+ if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
kafka_server_config['broker.id'] = brokerid
-
- #listeners and advertised.listeners are only added in 2.3.0.0 onwards.
- if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.3.0.0') >= 0:
- if params.security_enabled and params.kafka_kerberos_enabled:
- listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
- if "SASL" not in listeners:
- listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
- kafka_server_config['listeners'] = listeners
- kafka_server_config['advertised.listeners'] = listeners
- else:
- listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
- kafka_server_config['listeners'] = listeners
- if 'advertised.listeners' in kafka_server_config:
- advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
- kafka_server_config['advertised.listeners'] = advertised_listeners
+ Logger.info(format("Calculating broker.id as {brokerid}"))
+
+ # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
+ if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.3.0.0') >= 0:
+ listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
+ Logger.info(format("Kafka listeners: {listeners}"))
+
+ if params.security_enabled and params.kafka_kerberos_enabled:
+ Logger.info("Kafka kerberos security is enabled.")
+ if "SASL" not in listeners:
+ listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
+
+ kafka_server_config['listeners'] = listeners
+ kafka_server_config['advertised.listeners'] = listeners
+ Logger.info(format("Kafka advertised listeners: {listeners}"))
+ else:
+ kafka_server_config['listeners'] = listeners
+
+ if 'advertised.listeners' in kafka_server_config:
+ advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+ kafka_server_config['advertised.listeners'] = advertised_listeners
+ Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
else:
- kafka_server_config['host.name'] = params.hostname
+ kafka_server_config['host.name'] = params.hostname
- if(params.has_metric_collector):
- kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
- kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
+ if params.has_metric_collector:
+ kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
+ kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
kafka_data_dir = kafka_server_config['log.dirs']
Directory(filter(None,kafka_data_dir.split(",")),
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 3caeb6d..6fcf08a 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -38,10 +38,10 @@ class KafkaBroker(Script):
def install(self, env):
self.install_packages(env)
- def configure(self, env):
+ def configure(self, env, upgrade_type=None):
import params
env.set_params(params)
- kafka()
+ kafka(upgrade_type=upgrade_type)
def pre_upgrade_restart(self, env, upgrade_type=None):
import params
@@ -64,13 +64,13 @@ class KafkaBroker(Script):
if compare_versions(src_version, '2.3.4.0') < 0 and compare_versions(dst_version, '2.3.4.0') >= 0:
# Calling the acl migration script requires the configs to be present.
- self.configure(env)
+ self.configure(env, upgrade_type=upgrade_type)
upgrade.run_migration(env, upgrade_type)
def start(self, env, upgrade_type=None):
import params
env.set_params(params)
- self.configure(env)
+ self.configure(env, upgrade_type=upgrade_type)
if params.is_supported_kafka_ranger:
setup_ranger_kafka() #Ranger Kafka Plugin related call
daemon_cmd = format('source {params.conf_dir}/kafka-env.sh ; {params.kafka_bin} start')
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
index f4c7f61..2341dcc 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
@@ -378,11 +378,10 @@
</service>
</group>
- <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+ <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
<service-check>false</service-check>
<skippable>true</skippable>
<supports-auto-skip-failure>false</supports-auto-skip-failure>
- <direction>UPGRADE</direction>
<execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
<task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
index b48184e..363e58f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
@@ -288,11 +288,10 @@
</service>
</group>
- <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+ <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
<service-check>false</service-check>
<skippable>true</skippable>
<supports-auto-skip-failure>false</supports-auto-skip-failure>
- <direction>UPGRADE</direction>
<execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
<task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
index 766f443..eaabf55 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
@@ -556,7 +556,6 @@
<service-check>false</service-check>
<skippable>true</skippable>
<supports-auto-skip-failure>false</supports-auto-skip-failure>
- <direction>UPGRADE</direction>
<execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
<task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">
http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
index 0c18f32..0df26c5 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
@@ -329,11 +329,10 @@
</service>
</group>
- <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+ <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
<service-check>false</service-check>
<skippable>true</skippable>
<supports-auto-skip-failure>false</supports-auto-skip-failure>
- <direction>UPGRADE</direction>
<execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
<task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">