You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/28 00:59:07 UTC
git commit: kafka-819;
System Test : Add validation of log segment index to offset;
patched by John Fung; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 c5e354d3a -> f570cce1f
kafka-819; System Test : Add validation of log segment index to offset; patched by John Fung; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f570cce1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f570cce1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f570cce1
Branch: refs/heads/0.8
Commit: f570cce1f4bec4ce50b4b1878a804b725f316b91
Parents: c5e354d
Author: John Fung <fu...@gmail.com>
Authored: Wed Mar 27 16:58:54 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 27 16:58:54 2013 -0700
----------------------------------------------------------------------
.../replication_testsuite/replica_basic_test.py | 2 +
system_test/utils/kafka_system_test_utils.py | 74 +++++++++++++++
2 files changed, 76 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index ce29240..40c1157 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -433,6 +433,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
+ kafka_system_test_utils.validate_index_log(self.systemTestEnv, self.testcaseEnv)
# =============================================
# draw graphs
http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 9e58624..dd082f5 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2129,4 +2129,78 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
+def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
+ logger.debug("#### Inside validate_index_log", extra=d)
+
+ failureCount = 0
+ brokerLogCksumDict = {}
+ testCaseBaseDir = testcaseEnv.testCaseBaseDir
+ tcConfigsList = testcaseEnv.testcaseConfigsList
+ validationStatusDict = testcaseEnv.validationStatusDict
+ clusterConfigList = systemTestEnv.clusterEntityConfigDictList
+ allBrokerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker")
+ brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id")
+
+ # loop through all brokers
+ for brokerEntityId in brokerEntityIdList:
+ logCksumDict = {}
+ # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
+ # => remoteLogSegmentDir : kafka_server_4_logs
+ remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir")
+ remoteLogSegmentDir = os.path.basename(remoteLogSegmentPathName)
+ logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+ localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir
+ kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "kafka_home")
+ hostname = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "hostname")
+ kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
+
+ # localLogSegmentPath :
+ # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs
+ # |- test_1-0
+ # |- 00000000000000000000.index
+ # |- 00000000000000000000.log
+ # |- 00000000000000000020.index
+ # |- 00000000000000000020.log
+ # |- . . .
+ # |- test_1-1
+ # |- 00000000000000000000.index
+ # |- 00000000000000000000.log
+ # |- 00000000000000000020.index
+ # |- 00000000000000000020.log
+ # |- . . .
+
+ # loop through all topicPartition directories such as : test_1-0, test_1-1, ...
+ for topicPartition in os.listdir(localLogSegmentPath):
+ # found a topic-partition directory
+ if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
+
+ # log segment files are located in : localLogSegmentPath + "/" + topicPartition
+ # sort the log segment files under each topic-partition and verify index
+ for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):
+ # only process index file: *.index
+ if logFile.endswith(".index"):
+ offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile
+ cmdStrList = ["ssh " + hostname,
+ kafkaRunClassBin + " kafka.tools.DumpLogSegments",
+ " --file " + offsetLogSegmentPathName,
+ "--verify-index-only 2>&1"]
+ cmdStr = " ".join(cmdStrList)
+
+ showMismatchedIndexOffset = False
+
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+ for line in subproc.stdout.readlines():
+ line = line.rstrip('\n')
+ if showMismatchedIndexOffset:
+ logger.debug("#### [" + line + "]", extra=d)
+ elif "Mismatches in :" in line:
+ logger.debug("#### error found [" + line + "]", extra=d)
+ failureCount += 1
+ showMismatchedIndexOffset = True
+
+ if failureCount == 0:
+ validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "PASSED"
+ else:
+ validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"