You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/25 22:08:45 UTC
git commit: kafka-791; Fix validation bugs in System Test;
patched by John Fung; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 e367f3ffb -> 26c50fac4
kafka-791; Fix validation bugs in System Test; patched by John Fung; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26c50fac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26c50fac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26c50fac
Branch: refs/heads/0.8
Commit: 26c50fac47802e4aa03793c60cb8316995bb37f1
Parents: e367f3f
Author: John Fung <fu...@gmail.com>
Authored: Mon Mar 25 14:08:24 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Mar 25 14:08:24 2013 -0700
----------------------------------------------------------------------
.../mirror_maker_testsuite/mirror_maker_test.py | 7 +-
.../replication_testsuite/replica_basic_test.py | 15 +-
system_test/utils/kafka_system_test_utils.py | 304 ++++++--------
system_test/utils/replication_utils.py | 3 +
system_test/utils/system_test_utils.py | 78 ++++-
5 files changed, 222 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/mirror_maker_testsuite/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py
index 48b0d25..098f531 100644
--- a/system_test/mirror_maker_testsuite/mirror_maker_test.py
+++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py
@@ -76,6 +76,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
testCasePathNameList.sort()
+ replicationUtils = ReplicationUtils(self)
+
# =============================================================
# launch each testcase one by one: testcase_1, testcase_2, ...
# =============================================================
@@ -282,9 +284,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
# validate the data matched and checksum
# =============================================
self.log_message("validating data matched")
- #kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+ kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "source")
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target")
# =============================================
http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 3fc47d9..ce29240 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -77,6 +77,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
testCasePathNameList.sort()
+ replicationUtils = ReplicationUtils(self)
+
# =============================================================
# launch each testcase one by one: testcase_1, testcase_2, ...
# =============================================================
@@ -423,16 +425,15 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
self.log_message("validating data matched")
if logRetentionTest.lower() == "true":
- kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
elif consumerMultiTopicsMode.lower() == "true":
- #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(
+ self.systemTestEnv, self.testcaseEnv, replicationUtils)
else:
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
- #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
-
+ kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
# =============================================
# draw graphs
# =============================================
http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 9411405..9e58624 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -1189,17 +1189,21 @@ def get_message_checksum(logPathName):
return messageChecksumList
-def validate_data_matched(systemTestEnv, testcaseEnv):
+def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils):
+ logger.debug("#### Inside validate_data_matched", extra=d)
+
validationStatusDict = testcaseEnv.validationStatusDict
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+ consumerDuplicateCount = 0
+
for prodPerfCfg in prodPerfCfgList:
producerEntityId = prodPerfCfg["entity_id"]
- #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
- topic = testcaseEnv.producerTopicsString
+ topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+ logger.debug("working on topic : " + topic, extra=d)
acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
@@ -1207,13 +1211,14 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
matchingConsumerEntityId = None
for consumerEntityId in consumerEntityIdList:
- #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
- consumerTopic = testcaseEnv.consumerTopicsString
+ consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
if consumerTopic in topic:
matchingConsumerEntityId = consumerEntityId
+ logger.debug("matching consumer entity id found", extra=d)
break
if matchingConsumerEntityId is None:
+ logger.debug("matching consumer entity id NOT found", extra=d)
break
msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
@@ -1229,10 +1234,11 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
producerMsgIdSet = set(producerMsgIdList)
consumerMsgIdSet = set(consumerMsgIdList)
- missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+ consumerDuplicateCount = len(consumerMsgIdList) - len(consumerMsgIdSet)
+ missingUniqConsumerMsgId = system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet)
outfile = open(msgIdMissingInConsumerLogPathName, "w")
- for id in missingMsgIdInConsumer:
+ for id in missingUniqConsumerMsgId:
outfile.write(id + "\n")
outfile.close()
@@ -1241,20 +1247,28 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
- if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+ missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet)
+ logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d)
+ logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d)
+
+ if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ):
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
elif (acks == "1"):
- missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
- print "#### missing Percent : ", missingPercentage
- if missingPercentage <= 1:
+ if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent:
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
- logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+ logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \
+ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
+ else:
+ validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+ logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \
+ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
else:
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):
+ logger.debug("#### Inside validate_leader_election_successful", extra=d)
if ( len(leaderDict) > 0 ):
try:
@@ -1545,6 +1559,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
+ logger.debug("#### Inside validate_07_08_migrated_data_matched", extra=d)
+
validationStatusDict = testcaseEnv.validationStatusDict
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -1614,6 +1630,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d)
def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"):
+ logger.debug("#### Inside validate_broker_log_segment_checksum", extra=d)
anonLogger.info("================================================")
anonLogger.info("validating merged broker log segment checksums")
@@ -1823,77 +1840,6 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None
partitionId += 1
replicaIndex += 1
-def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv):
- validationStatusDict = testcaseEnv.validationStatusDict
- clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-
- prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
- consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
-
- mismatchCount = 0
-
- for prodPerfCfg in prodPerfCfgList:
- producerEntityId = prodPerfCfg["entity_id"]
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
- acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
- logger.debug("request-num-acks [" + acks + "]", extra=d)
-
- consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
- clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
-
- matchingConsumerEntityId = None
- for consumerEntityId in consumerEntityIdList:
- consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
- if consumerTopic in topic:
- matchingConsumerEntityId = consumerEntityId
- break
-
- if matchingConsumerEntityId is None:
- break
-
- producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
- producerLogPathName = producerLogPath + "/producer_performance.log"
- producerMsgIdList = get_message_id(producerLogPathName)
- producerMsgIdSet = set(producerMsgIdList)
- logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d)
- validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
-
- consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
- for logFile in sorted(os.listdir(consumerLogPath)):
- # only process log file: *.log
- if logFile.endswith(".log"):
- consumerLogPathName = consumerLogPath + "/" + logFile
- consumerMsgIdList = get_message_id(consumerLogPathName)
- consumerMsgIdSet = set(consumerMsgIdList)
- missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
- msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(
- testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + \
- "/" + logFile + "_msg_id_missing_in_consumer.log"
-
- outfile = open(msgIdMissingInConsumerLogPathName, "w")
- for id in missingMsgIdInConsumer:
- outfile.write(id + "\n")
- outfile.close()
-
- logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
- validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet))
-
- if acks == "-1" and len(missingMsgIdInConsumer) > 0:
- mismatchCount += 1
- elif acks == "1" and len(missingMsgIdInConsumer) > 0:
- missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
- logger.debug("missing percentage [" + str(missingPercentage) + "]", extra=d)
- if missingPercentage <= 1:
- logger.warn("Test case (acks == 1) passes with < 1% data loss : [" + \
- str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
- else:
- mismatchCount += 1
-
- if mismatchCount == 0:
- validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
- else:
- validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
-
def get_controller_attributes(systemTestEnv, testcaseEnv):
logger.info("Querying Zookeeper for Controller info ...", extra=d)
@@ -1917,7 +1863,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv):
"\"JAVA_HOME=" + javaHome,
kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
"-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
- "'get /controller' 2> /dev/null | tail -1\""]
+ "get /controller 2> /dev/null | tail -1\""]
cmdStr = " ".join(cmdStrList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
@@ -2007,6 +1953,8 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source")
return minCommonStartOffsetDict
def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcaseEnv):
+ logger.debug("#### Inside validate_simple_consumer_data_matched_across_replicas", extra=d)
+
validationStatusDict = testcaseEnv.validationStatusDict
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
@@ -2014,101 +1962,100 @@ def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas
replicaFactor = testcaseEnv.testcaseArgumentsDict["replica_factor"]
numPartition = testcaseEnv.testcaseArgumentsDict["num_partition"]
- # Unique messages from producer on [test_1] : 1500
- # Unique messages from consumer on [test_1] : 1500
+ for consumerEntityId in consumerEntityIdList:
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log : 750
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log : 750
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log : 0
+ # get topic string from multi consumer "entity"
+ topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log : 0
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log : 750
- # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log : 750
+ # the topic string could be multi topics separated by ','
+ topicList = topicStr.split(',')
- # ==================================================
+ for topic in topicList:
+ logger.debug("working on topic : " + topic, extra=d)
+ consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default")
+
+ # keep track of total msg count across replicas for each topic-partition
+ # (should be greater than 0 for passing)
+ totalMsgCounter = 0
+
+ # keep track of the mismatch msg count for each topic-partition
+ # (should be equal to 0 for passing)
+ mismatchCounter = 0
+
+ replicaIdxMsgIdList = []
+ # replicaIdxMsgIdList :
+ # - This is a list of dictionaries of topic-partition (key)
+ # mapping to list of MessageID in that topic-partition (val)
+ # - The list index is mapped to (replicaId - 1)
+ # [
+ # // list index = 0 => replicaId = idx(0) + 1 = 1
+ # {
+ # "topic1-0" : [ "0000000001", "0000000002", "0000000003"],
+ # "topic1-1" : [ "0000000004", "0000000005", "0000000006"]
+ # },
+ # // list index = 1 => replicaId = idx(1) + 1 = 2
+ # {
+ # "topic1-0" : [ "0000000001", "0000000002", "0000000003"],
+ # "topic1-1" : [ "0000000004", "0000000005", "0000000006"]
+ # }
+ # ]
+
+ # initialize replicaIdxMsgIdList
+ j = 0
+ while j < int(replicaFactor):
+ newDict = {}
+ replicaIdxMsgIdList.append(newDict)
+ j += 1
+
+ # retrieve MessageID from all simple consumer log4j files
+ for logFile in sorted(os.listdir(consumerLogPath)):
+
+ if logFile.startswith("simple_consumer_"+topic) and logFile.endswith(".log"):
+ logger.debug("working on file : " + logFile, extra=d)
+ matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
+ partitionId = int(matchObj.group(1))
+ replicaIdx = int(matchObj.group(2))
+
+ consumerLogPathName = consumerLogPath + "/" + logFile
+ consumerMsgIdList = get_message_id(consumerLogPathName)
- # Unique messages from producer on [test_2] : 1000
- # Unique messages from consumer on [test_2] : 1000
+ topicPartition = topic + "-" + str(partitionId)
+ replicaIdxMsgIdList[replicaIdx - 1][topicPartition] = consumerMsgIdList
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log : 500
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log : 0
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log : 500
+ logger.info("no. of messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdList)), extra=d)
+ validationStatusDict["No. of messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdList))
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log : 500
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log : 500
- # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log : 0
+ # print replicaIdxMsgIdList
- mismatchCounter = 0
- for consumerEntityId in consumerEntityIdList:
+ # take the first dictionary of replicaIdxMsgIdList and compare with the rest
+ firstMsgIdDict = replicaIdxMsgIdList[0]
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
- consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default")
-
- replicaIdxMsgCountDictList = []
- # replicaIdxMsgCountDictList is being used as follows:
- #
- # the above replica message count will be organized as follows:
- # index of the list would map to the partitionId
- # each element in the list maps to the replicaIdx-MessageCount
- # to validate that :
- # 1. there should be "no. of broker" of non-zero message count and they are equal
- # 2. there should be "no. of broker - replication factor" of zero count
- # [{"1": "750", "2": "750", "3": "0" },
- # {"1": "0" , "2": "750", "3": "750"}]
-
- j = 0
- while j < int(numPartition):
- newDict = {}
- replicaIdxMsgCountDictList.append(newDict)
- j += 1
-
- for logFile in sorted(os.listdir(consumerLogPath)):
-
- if logFile.startswith("simple_consumer_") and logFile.endswith(".log"):
- matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
- partitionId = int(matchObj.group(1))
- replicaIdx = int(matchObj.group(2))
-
- consumerLogPathName = consumerLogPath + "/" + logFile
- consumerMsgIdList = get_message_id(consumerLogPathName)
- consumerMsgIdSet = set(consumerMsgIdList)
-
- replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet)
-
- logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
- validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet))
-
- pprint.pprint(replicaIdxMsgCountDictList)
-
- partitionId = 0
- while partitionId < int(numPartition):
- zeroMsgCounter = 0
- nonZeroMsgCounter = 0
- nonZeroMsgValue = -1
-
- for replicaIdx in sorted(replicaIdxMsgCountDictList[partitionId].iterkeys()):
- if replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] == 0:
- zeroMsgCounter += 1
- else:
- if nonZeroMsgValue == -1:
- nonZeroMsgValue = replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]
- else:
- if nonZeroMsgValue != replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]:
- mismatchCounter += 1
- nonZeroMsgCounter += 1
- partitionId += 1
+ # loop through all 'topic-partition' such as topic1-0, topic1-1, ...
+ for topicPartition in sorted(firstMsgIdDict.iterkeys()):
- logger.info("topic " + topic + " : no. of brokers with zero msg count : " + str(zeroMsgCounter), extra=d)
- logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d)
- logger.info("topic " + topic + " : non-zero brokers msg count : " + str(nonZeroMsgValue), extra=d)
+ # compare all replicas' MessageID in corresponding topic-partition
+ for i in range(len(replicaIdxMsgIdList)):
+ # skip the first dictionary
+ if i == 0:
+ totalMsgCounter += len(firstMsgIdDict[topicPartition])
+ continue
- if mismatchCounter == 0 and nonZeroMsgCounter > 0:
- validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
- else:
- validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"
+ totalMsgCounter += len(replicaIdxMsgIdList[i][topicPartition])
+
+ # get the count of mismatch MessageID between first MessageID list and the other lists
+ diffCount = system_test_utils.diff_lists(firstMsgIdDict[topicPartition], replicaIdxMsgIdList[i][topicPartition])
+ mismatchCounter += diffCount
+ logger.info("Mismatch count of topic-partition [" + topicPartition + "] in replica id [" + str(i+1) + "] : " + str(diffCount), extra=d)
+
+ if mismatchCounter == 0 and totalMsgCounter > 0:
+ validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
+ else:
+ validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"
-def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv):
+def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv, replicationUtils):
+ logger.debug("#### Inside validate_data_matched_in_multi_topics_from_single_consumer_producer", extra=d)
+
validationStatusDict = testcaseEnv.validationStatusDict
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -2140,6 +2087,7 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
topicList = topicStr.split(',')
for topic in topicList:
+ consumerDuplicateCount = 0
msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(
testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
+ "/msg_id_missing_in_consumer_" + topic + ".log"
@@ -2148,10 +2096,11 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
producerMsgIdSet = set(producerMsgIdList)
consumerMsgIdSet = set(consumerMsgIdList)
- missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+ consumerDuplicateCount = len(consumerMsgIdList) -len(consumerMsgIdSet)
+ missingUniqConsumerMsgId = system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet)
outfile = open(msgIdMissingInConsumerLogPathName, "w")
- for id in missingMsgIdInConsumer:
+ for id in missingUniqConsumerMsgId:
outfile.write(id + "\n")
outfile.close()
@@ -2160,14 +2109,21 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
- if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+ missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet)
+ logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d)
+ logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d)
+
+ if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ):
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
elif (acks == "1"):
- missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
- print "#### missing Percent : ", missingPercentage
- if missingPercentage <= 1:
+ if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent:
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
- logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+ logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \
+ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
+ else:
+ validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+ logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \
+ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
else:
validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/replication_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py
index 3e8efad..cfd80b2 100644
--- a/system_test/utils/replication_utils.py
+++ b/system_test/utils/replication_utils.py
@@ -65,3 +65,6 @@ class ReplicationUtils(object):
self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .* \[Controller (.*?)\]: " + \
self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"]
+ # Data Loss Percentage Threshold in Ack = 1 cases
+ self.ackOneDataLossThresholdPercent = 5.0
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py
index 65db5c5..50340f0 100644
--- a/system_test/utils/system_test_utils.py
+++ b/system_test/utils/system_test_utils.py
@@ -21,6 +21,7 @@
# ===================================
import copy
+import difflib
import inspect
import json
import logging
@@ -554,5 +555,80 @@ def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testMo
sys.exit(1)
print
-
+# =================================================
+# lists_diff_count
+# - find the no. of different items in both lists
+# - both lists need not be sorted
+# - input lists won't be changed
+# =================================================
+def lists_diff_count(a, b):
+ c = list(b)
+ d = []
+ for item in a:
+ try:
+ c.remove(item)
+ except:
+ d.append(item)
+
+ if len(d) > 0:
+ print "#### Mismatch MessageID"
+ print d
+
+ return len(c) + len(d)
+
+# =================================================
+# subtract_list
+# - subtract items in listToSubtract from mainList
+# and return the resulting list
+# - both lists need not be sorted
+# - input lists won't be changed
+# =================================================
+def subtract_list(mainList, listToSubtract):
+ remainingList = list(mainList)
+ for item in listToSubtract:
+ try:
+ remainingList.remove(item)
+ except:
+ pass
+ return remainingList
+
+# =================================================
+# diff_lists
+# - find the diff of 2 lists and return the
+# total no. of mismatch from both lists
+# - diff of both lists includes:
+# - no. of items mismatch
+# - ordering of the items
+#
+# sample lists:
+# a = ['8','4','3','2','1']
+# b = ['8','3','4','2','1']
+#
+# difflib will return the following:
+# 8
+# + 3
+# 4
+# - 3
+# 2
+# 1
+#
+# diff_lists(a,b) returns 2 and prints the following:
+# #### only in seq 2 : + 3
+# #### only in seq 1 : - 3
+# =================================================
+def diff_lists(a, b):
+ mismatchCount = 0
+ d = difflib.Differ()
+ diff = d.compare(a,b)
+
+ for item in diff:
+ result = item[0:1].strip()
+ if len(result) > 0:
+ mismatchCount += 1
+ if '-' in result:
+ logger.debug("#### only in seq 1 : " + item, extra=d)
+ elif '+' in result:
+ logger.debug("#### only in seq 2 : " + item, extra=d)
+
+ return mismatchCount