You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:49 UTC
[04/36] git commit: kafka-879;
In system test, read the new leader from zookeeper instead of broker
log on completion of become-leader state transition; patched by John Fung;
reviewed by Jun Rao
kafka-879; In system test, read the new leader from zookeeper instead of broker log on completion of become-leader state transition; patched by John Fung; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3aa3ef0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3aa3ef0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3aa3ef0
Branch: refs/heads/trunk
Commit: d3aa3ef073fe773a10168f925a45747f52c4e3c0
Parents: 3817857
Author: John Fung <fu...@gmail.com>
Authored: Tue Jul 23 09:38:52 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 23 09:38:52 2013 -0700
----------------------------------------------------------------------
.../replication_testsuite/replica_basic_test.py | 36 ++++++-------
system_test/utils/kafka_system_test_utils.py | 57 ++++++++++++++++++++
2 files changed, 75 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 40c1157..17414ad 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
# ==============================================
if brokerType == "leader" or brokerType == "follower":
self.log_message("looking up leader")
- leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
+ leaderDict = kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, self.testcaseEnv)
# ==========================
# leaderDict looks like this:
@@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
# trigger leader re-election by stopping leader to get re-election latency
- reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
- latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
- self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
- self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
+ #reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
+ #latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
+ #self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
+ #self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
elif brokerType == "follower":
# stopping Follower
@@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
# while loop
# update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict
- self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
- try:
- self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
- min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
- except:
- pass
-
- self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
- try:
- self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
- max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
- except:
- pass
+ #self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
+ #try:
+ # self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
+ # min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+ #except:
+ # pass
+ #
+ #self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
+ #try:
+ # self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
+ # max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+ #except:
+ # pass
# =============================================
# tell producer to stop
http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index ae393bc..de16a34 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2211,3 +2211,60 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
else:
validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"
+def get_leader_attributes(systemTestEnv, testcaseEnv):
+
+ logger.info("Querying Zookeeper for leader info ...", extra=d)
+
+ # keep track of leader data in this dict such as broker id & entity id
+ leaderDict = {}
+
+ clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+ tcConfigsList = testcaseEnv.testcaseConfigsList
+
+ zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
+ firstZkDict = zkDictList[0]
+ hostname = firstZkDict["hostname"]
+ zkEntityId = firstZkDict["entity_id"]
+ clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort")
+ kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home")
+ javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home")
+ kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
+
+ # this should have been updated in start_producer_in_thread
+ producerTopicsString = testcaseEnv.producerTopicsString
+ topics = producerTopicsString.split(',')
+ zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state"
+ brokerid = ''
+
+ cmdStrList = ["ssh " + hostname,
+ "\"JAVA_HOME=" + javaHome,
+ kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
+ "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+ zkQueryStr + " 2> /dev/null | tail -1\""]
+ cmdStr = " ".join(cmdStrList)
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+
+ subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+ for line in subproc.stdout.readlines():
+ logger.debug("zk returned : " + line, extra=d)
+ if "\"leader\"" in line:
+ line = line.rstrip('\n')
+ json_data = json.loads(line)
+ for key,val in json_data.items():
+ if key == 'leader':
+ brokerid = str(val)
+
+ leaderDict["brokerid"] = brokerid
+ leaderDict["topic"] = topics[0]
+ leaderDict["partition"] = '0'
+ leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval(
+ tcConfigsList, "broker.id", brokerid, "entity_id")
+ leaderDict["hostname"] = system_test_utils.get_data_by_lookup_keyval(
+ clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname")
+ break
+
+ print leaderDict
+ return leaderDict
+
+
+