You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/12/02 19:35:12 UTC

ambari git commit: AMBARI-14147. RU: Kafka broker restart failed on downgrade from HDP 2.3 to 2.2 (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/trunk 2a970f8de -> 4b57a0f8a


AMBARI-14147. RU: Kafka broker restart failed on downgrade from HDP 2.3 to 2.2 (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4b57a0f8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4b57a0f8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4b57a0f8

Branch: refs/heads/trunk
Commit: 4b57a0f8aa7329526b8c53bd7ad60f73cc19649d
Parents: 2a970f8
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Dec 1 16:18:24 2015 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Wed Dec 2 10:35:05 2015 -0800

----------------------------------------------------------------------
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    | 68 +++++++++++++-------
 .../0.8.1.2.2/package/scripts/kafka_broker.py   |  8 +--
 .../HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml |  3 +-
 .../HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml |  3 +-
 .../HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml |  1 -
 .../HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml |  3 +-
 6 files changed, 50 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 2bd93d2..a8bd9d5 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -17,13 +17,19 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+import os
 
-from resource_management import *
+from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
 from resource_management.libraries.resources.properties_file import PropertiesFile
 from resource_management.libraries.resources.template_config import TemplateConfig
-import os
+from resource_management.core.resources.system import Directory, File, Link
+from resource_management.core.source import StaticFile, Template, InlineTemplate
+from resource_management.libraries.functions import format
+
+
+from resource_management.core.logger import Logger
 
-def kafka():
+def kafka(upgrade_type=None):
     import params
 
     Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
@@ -35,32 +41,44 @@ def kafka():
           )
 
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
-    # This still has an issue of out of alphabetical order of hostnames can get out of order broker.id assigned to them for HDP-2.2.
-    # Since from HDP-2.3 kafka is handling the generation of broker.id ambari doesn't need to generate one.
-    if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2.0.0') >= 0 and compare_versions(params.hdp_stack_version, '2.3.0.0') < 0 :
+    # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
+    # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari doesn't have to.
+
+    effective_version = params.hdp_stack_version if upgrade_type is None else format_hdp_stack_version(params.version)
+    Logger.info(format("Effective stack version: {effective_version}"))
+
+    if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.2.0.0') >= 0 and compare_versions(effective_version, '2.3.0.0') < 0:
+      if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
         brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
         kafka_server_config['broker.id'] = brokerid
-
-    #listeners and advertised.listeners are only added in 2.3.0.0 onwards.
-    if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.3.0.0') >= 0:
-        if params.security_enabled and params.kafka_kerberos_enabled:
-            listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
-            if "SASL" not in listeners:
-                listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
-            kafka_server_config['listeners'] = listeners
-            kafka_server_config['advertised.listeners'] = listeners
-        else:
-            listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
-            kafka_server_config['listeners'] = listeners
-            if 'advertised.listeners' in kafka_server_config:
-                advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
-                kafka_server_config['advertised.listeners'] = advertised_listeners
+        Logger.info(format("Calculating broker.id as {brokerid}"))
+
+    # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
+    if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.3.0.0') >= 0:
+      listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
+      Logger.info(format("Kafka listeners: {listeners}"))
+
+      if params.security_enabled and params.kafka_kerberos_enabled:
+        Logger.info("Kafka kerberos security is enabled.")
+        if "SASL" not in listeners:
+          listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
+
+        kafka_server_config['listeners'] = listeners
+        kafka_server_config['advertised.listeners'] = listeners
+        Logger.info(format("Kafka advertised listeners: {listeners}"))
+      else:
+        kafka_server_config['listeners'] = listeners
+
+        if 'advertised.listeners' in kafka_server_config:
+          advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+          kafka_server_config['advertised.listeners'] = advertised_listeners
+          Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
     else:
-        kafka_server_config['host.name'] = params.hostname
+      kafka_server_config['host.name'] = params.hostname
 
-    if(params.has_metric_collector):
-            kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
-            kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
+    if params.has_metric_collector:
+      kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
+      kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
 
     kafka_data_dir = kafka_server_config['log.dirs']
     Directory(filter(None,kafka_data_dir.split(",")),

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 3caeb6d..6fcf08a 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -38,10 +38,10 @@ class KafkaBroker(Script):
   def install(self, env):
     self.install_packages(env)
 
-  def configure(self, env):
+  def configure(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    kafka()
+    kafka(upgrade_type=upgrade_type)
 
   def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
@@ -64,13 +64,13 @@ class KafkaBroker(Script):
 
       if compare_versions(src_version, '2.3.4.0') < 0 and compare_versions(dst_version, '2.3.4.0') >= 0:
         # Calling the acl migration script requires the configs to be present.
-        self.configure(env)
+        self.configure(env, upgrade_type=upgrade_type)
         upgrade.run_migration(env, upgrade_type)
 
   def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    self.configure(env)
+    self.configure(env, upgrade_type=upgrade_type)
     if params.is_supported_kafka_ranger:
       setup_ranger_kafka() #Ranger Kafka Plugin related call 
     daemon_cmd = format('source {params.conf_dir}/kafka-env.sh ; {params.kafka_bin} start')

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
index f4c7f61..2341dcc 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
@@ -378,11 +378,10 @@
       </service>
     </group>
 
-    <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+    <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
       <service-check>false</service-check>
       <skippable>true</skippable>
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
-      <direction>UPGRADE</direction>
 
       <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
         <task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
index b48184e..363e58f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
@@ -288,11 +288,10 @@
       </service>
     </group>
 
-    <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+    <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
       <service-check>false</service-check>
       <skippable>true</skippable>
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
-      <direction>UPGRADE</direction>
 
       <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
         <task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
index 766f443..eaabf55 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml
@@ -556,7 +556,6 @@
       <service-check>false</service-check>
       <skippable>true</skippable>
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
-      <direction>UPGRADE</direction>
 
       <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
         <task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b57a0f8/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
index 0c18f32..0df26c5 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml
@@ -329,11 +329,10 @@
       </service>
     </group>
 
-    <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+    <group xsi:type="cluster" name="HDFS_LEAVE_SAFEMODE" title="HDFS - Wait to leave Safemode">
       <service-check>false</service-check>
       <skippable>true</skippable>
       <supports-auto-skip-failure>false</supports-auto-skip-failure>
-      <direction>UPGRADE</direction>
 
       <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
         <task xsi:type="execute" hosts="all" summary="Wait for NameNode to leave Safemode">