You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:49 UTC

[04/36] git commit: kafka-879; In system test, read the new leader from zookeeper instead of broker log on completion of become-leader state transition; patched by John Fung; reviewed by Jun Rao

kafka-879; In system test, read the new leader from zookeeper instead of broker log on completion of become-leader state transition; patched by John Fung; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3aa3ef0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3aa3ef0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3aa3ef0

Branch: refs/heads/trunk
Commit: d3aa3ef073fe773a10168f925a45747f52c4e3c0
Parents: 3817857
Author: John Fung <fu...@gmail.com>
Authored: Tue Jul 23 09:38:52 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 23 09:38:52 2013 -0700

----------------------------------------------------------------------
 .../replication_testsuite/replica_basic_test.py | 36 ++++++-------
 system_test/utils/kafka_system_test_utils.py    | 57 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 40c1157..17414ad 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                     # ==============================================
                     if brokerType == "leader" or brokerType == "follower":
                         self.log_message("looking up leader")
-                        leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
+                        leaderDict = kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, self.testcaseEnv)
 
                         # ==========================
                         # leaderDict looks like this:
@@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                             kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
                 
                             # trigger leader re-election by stopping leader to get re-election latency
-                            reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
-                            latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
-                            self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
-                            self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
+                            #reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
+                            #latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
+                            #self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
+                            #self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
 
                         elif brokerType == "follower":
                             # stopping Follower
@@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 # while loop
 
                 # update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict
-                self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
-                try:
-                    self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
-                        min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
-                except:
-                    pass
-
-                self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
-                try:
-                    self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
-                        max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
-                except:
-                    pass
+                #self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
+                #try:
+                #    self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
+                #        min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                #except:
+                #    pass
+                #
+                #self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
+                #try:
+                #    self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
+                #        max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                #except:
+                #    pass
 
                 # =============================================
                 # tell producer to stop

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index ae393bc..de16a34 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2211,3 +2211,60 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
     else:
         validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"
 
+def get_leader_attributes(systemTestEnv, testcaseEnv):
+
+    logger.info("Querying Zookeeper for leader info ...", extra=d)
+
+    # keep track of leader data in this dict such as broker id & entity id
+    leaderDict = {} 
+
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+    tcConfigsList      = testcaseEnv.testcaseConfigsList
+
+    zkDictList         = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
+    firstZkDict        = zkDictList[0]
+    hostname           = firstZkDict["hostname"]
+    zkEntityId         = firstZkDict["entity_id"]
+    clientPort         = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort")
+    kafkaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home")
+    javaHome           = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home")
+    kafkaRunClassBin   = kafkaHome + "/bin/kafka-run-class.sh"
+
+    # this should have been updated in start_producer_in_thread
+    producerTopicsString = testcaseEnv.producerTopicsString
+    topics = producerTopicsString.split(',')
+    zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state"
+    brokerid   = ''
+
+    cmdStrList = ["ssh " + hostname,
+                  "\"JAVA_HOME=" + javaHome,
+                  kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
+                  "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                  zkQueryStr + " 2> /dev/null | tail -1\""]
+    cmdStr = " ".join(cmdStrList)
+    logger.debug("executing command [" + cmdStr + "]", extra=d)
+
+    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+    for line in subproc.stdout.readlines():
+        logger.debug("zk returned : " + line, extra=d)
+        if "\"leader\"" in line:
+            line = line.rstrip('\n')
+            json_data = json.loads(line)
+            for key,val in json_data.items():
+                if key == 'leader':
+                    brokerid = str(val)
+
+            leaderDict["brokerid"]  = brokerid
+            leaderDict["topic"]     = topics[0]
+            leaderDict["partition"] = '0'
+            leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval(
+                                          tcConfigsList, "broker.id", brokerid, "entity_id")
+            leaderDict["hostname"]  = system_test_utils.get_data_by_lookup_keyval(
+                                          clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname")
+            break
+
+    print leaderDict
+    return leaderDict
+
+
+