You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by am...@apache.org on 2018/07/02 18:43:52 UTC

[ambari] branch trunk updated: [AMBARI-24203] - Check for under-replicated partitions in Kafka service check (#1635)

This is an automated email from the ASF dual-hosted git repository.

amagyar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60e54750 [AMBARI-24203] - Check for under-replicated partitions in Kafka service check (#1635)
60e54750 is described below

commit 60e547502b49f07c1c29077e71eb5a231baddc9f
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Mon Jul 2 14:43:49 2018 -0400

    [AMBARI-24203] - Check for under-replicated partitions in Kafka service check (#1635)
    
    * AMBARI-24203 - aligned with version in branch-2.6 and added commands to check for under replicated partitions
    
    * AMBARI-24203 - fixed case for Topic
---
 .../KAFKA/0.8.1/package/scripts/service_check.py   | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
index 748dbf2..0559a83 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
@@ -48,22 +48,24 @@ class ServiceCheck(Script):
       Logger.info('Kafka delete.topic.enable is not enabled. Skipping topic creation: %s' % topic)
       return
 
-  # run create topic command only if the topic doesn't exists
-    
-    delete_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --delete --topic {topic}")
-    create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 --replication-factor 1")
-    if topic in topic_exists_cmd_out:
-      # run delete topic and recreate the topic command only if the topic exists
-      command = source_cmd + " ; " + delete_topic_cmd + ";" + create_topic_cmd
-    else:
-      # run create topic command 
+    # run create topic command only if the topic doesn't exists
+    if topic not in topic_exists_cmd_out:
+      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 --replication-factor 1")
       command = source_cmd + " ; " + create_topic_cmd
-    Logger.info("Running kafka create topic command: %s" % command)
-    call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user)
+      Logger.info("Running kafka create topic command: %s" % command)
+      call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user)
+
+    under_rep_cmd = format("{kafka_home}/bin/kafka-topics.sh --describe --zookeeper {kafka_config[zookeeper.connect]} --under-replicated-partitions")
+    under_rep_cmd_code, under_rep_cmd_out = shell.call(under_rep_cmd, logoutput=True, quiet=False, user=params.kafka_user)
+
+    if under_rep_cmd_code > 0:
+      raise Fail("Error encountered when attempting find under replicated partitions: {0}".format(under_rep_cmd_out))
+    elif len(under_rep_cmd_out) > 0 and "Topic" in under_rep_cmd_out:
+      raise Fail("Under replicated partitions found: {0}".format(under_rep_cmd_out))
 
   def read_kafka_config(self):
     import params
-    
+
     kafka_config = {}
     content = sudo.read_file(params.conf_dir + "/server.properties")
     for line in content.splitlines():
@@ -72,7 +74,7 @@ class ServiceCheck(Script):
 
       key, value = line.split("=")
       kafka_config[key] = value.replace("\n", "")
-    
+
     return kafka_config
 
 if __name__ == "__main__":