You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2015/12/11 20:53:29 UTC

ambari git commit: AMBARI-11634. Kafka Service Check completes successfully however the stdout indicates an error creating a previously created topic. (Sriharsha Chintalapani via Jaimin)

Repository: ambari
Updated Branches:
  refs/heads/trunk d342a7627 -> 7c617e64f


AMBARI-11634. Kafka Service Check completes successfully however the stdout indicates an error creating a previously created topic. (Sriharsha Chintalapani via Jaimin)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7c617e64
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7c617e64
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7c617e64

Branch: refs/heads/trunk
Commit: 7c617e64f3f465c91894d790c0095636a13f4a5c
Parents: d342a76
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri Dec 11 11:52:36 2015 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri Dec 11 11:53:02 2015 -0800

----------------------------------------------------------------------
 .../0.8.1.2.2/package/scripts/service_check.py  | 23 ++++++++++++--------
 1 file changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7c617e64/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
index bcf62c5..0f3a417 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
@@ -22,25 +22,30 @@ from resource_management.libraries.functions.validate import call_and_match_outp
 from resource_management.libraries.functions.format import format
 from resource_management.core.logger import Logger
 from resource_management.core import sudo
+import subprocess
 
 class ServiceCheck(Script):
   def service_check(self, env):
     import params
     env.set_params(params)
 
-    # TODO, Kafka was introduced in HDP 2.2 but will not support running in a kerberized cluster until HDP 2.3 (tentatively)
-    # Kafka uses its own Zookeeper instance and it does not yet have the capability of running in a secure mode.
+    # TODO, Kafka Service check should be more robust , It should get all the broker_hosts
+    # Produce some messages and check if consumer reads same no.of messages.
+    
     kafka_config = self.read_kafka_config()
-
+    topic = "ambari_kafka_service_check"
     create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
     create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
-
     source_cmd = format("source {conf_dir}/kafka-env.sh")
-    create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic ambari_kafka_service_check --partitions 1 --replication-factor 1")
-    command = source_cmd + " ; " + create_topic_cmd
-
-    Logger.info("Running kafka create topic command: %s" % command)
-    call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user)
+    topic_exists_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --topic {topic} --list")
+    topic_exists_cmd_p = subprocess.Popen(topic_exists_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    topic_exists_cmd_out, topic_exists_cmd_err = topic_exists_cmd_p.communicate()
+    # run create topic command only if the topic doesn't exists
+    if topic not in topic_exists_cmd_out:
+      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 --replication-factor 1")
+      command = source_cmd + " ; " + create_topic_cmd
+      Logger.info("Running kafka create topic command: %s" % command)
+      call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user)
 
   def read_kafka_config(self):
     import params