You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/28 00:59:07 UTC

git commit: kafka-819; System Test : Add validation of log segment index to offset; patched by John Fung; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 c5e354d3a -> f570cce1f


kafka-819; System Test : Add validation of log segment index to offset; patched by John Fung; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f570cce1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f570cce1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f570cce1

Branch: refs/heads/0.8
Commit: f570cce1f4bec4ce50b4b1878a804b725f316b91
Parents: c5e354d
Author: John Fung <fu...@gmail.com>
Authored: Wed Mar 27 16:58:54 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 27 16:58:54 2013 -0700

----------------------------------------------------------------------
 .../replication_testsuite/replica_basic_test.py    |    2 +
 system_test/utils/kafka_system_test_utils.py       |   74 +++++++++++++++
 2 files changed, 76 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index ce29240..40c1157 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -433,6 +433,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
+                kafka_system_test_utils.validate_index_log(self.systemTestEnv, self.testcaseEnv)
  
                 # =============================================
                 # draw graphs

http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 9e58624..dd082f5 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2129,4 +2129,78 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
                 logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
 
 
+def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
+    logger.debug("#### Inside validate_index_log", extra=d)
+
+    failureCount         = 0
+    brokerLogCksumDict   = {}
+    testCaseBaseDir      = testcaseEnv.testCaseBaseDir
+    tcConfigsList        = testcaseEnv.testcaseConfigsList
+    validationStatusDict = testcaseEnv.validationStatusDict
+    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
+    allBrokerConfigList  = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker")
+    brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id")
+
+    # loop through all brokers
+    for brokerEntityId in brokerEntityIdList:
+        logCksumDict   = {}
+        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
+        # => remoteLogSegmentDir   : kafka_server_4_logs
+        remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir")
+        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
+        logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir
+        kafkaHome                = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "kafka_home")
+        hostname                 = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "hostname")
+        kafkaRunClassBin         = kafkaHome + "/bin/kafka-run-class.sh"
+
+        # localLogSegmentPath :
+        # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs
+        #   |- test_1-0
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+        #   |- test_1-1
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+
+        # loop through all topicPartition directories such as : test_1-0, test_1-1, ...
+        for topicPartition in os.listdir(localLogSegmentPath):
+            # found a topic-partition directory
+            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
+
+                # log segment files are located in : localLogSegmentPath + "/" + topicPartition
+                # sort the log segment files under each topic-partition and verify index
+                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):
+                    # only process index file: *.index
+                    if logFile.endswith(".index"):
+                        offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile
+                        cmdStrList = ["ssh " + hostname,
+                                      kafkaRunClassBin + " kafka.tools.DumpLogSegments",
+                                      " --file " + offsetLogSegmentPathName,
+                                      "--verify-index-only 2>&1"]
+                        cmdStr     = " ".join(cmdStrList)
+
+                        showMismatchedIndexOffset = False
+
+                        logger.debug("executing command [" + cmdStr + "]", extra=d)
+                        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+                        for line in subproc.stdout.readlines():
+                            line = line.rstrip('\n')
+                            if showMismatchedIndexOffset:
+                                logger.debug("#### [" + line + "]", extra=d)
+                            elif "Mismatches in :" in line:
+                                logger.debug("#### error found [" + line + "]", extra=d)
+                                failureCount += 1
+                                showMismatchedIndexOffset = True
+
+    if failureCount == 0:
+        validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "PASSED"
+    else:
+        validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"