You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/11/16 20:38:16 UTC
svn commit: r1410547 - in /incubator/kafka/branches/0.8/system_test: ./
replication_testsuite/ replication_testsuite/testcase_0151/
replication_testsuite/testcase_0152/ replication_testsuite/testcase_0153/
replication_testsuite/testcase_0154/ replicati...
Author: junrao
Date: Fri Nov 16 19:38:14 2012
New Revision: 1410547
URL: http://svn.apache.org/viewvc?rev=1410547&view=rev
Log:
System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments; patched by John Fung; reviewed by Jun Rao; KAFKA-605
Added:
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
Modified:
incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json
incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json
incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Fri Nov 16 19:38:14 2012
@@ -123,6 +123,12 @@ class ReplicaBasicTest(ReplicationUtils,
logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"]
except:
pass
+ consumerMultiTopicsMode = "false"
+ try:
+ consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"]
+ except:
+ pass
+
# initialize self.testcaseEnv with user-defined environment variables (product specific)
self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
@@ -363,8 +369,8 @@ class ReplicaBasicTest(ReplicationUtils,
# =============================================
minStartingOffsetDict = None
if logRetentionTest.lower() == "true":
- self.anonLogger.info("sleeping for 10s before collecting logs")
- time.sleep(10)
+ self.anonLogger.info("sleeping for 60s to make sure log truncation is completed")
+ time.sleep(60)
kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
minStartingOffsetDict = kafka_system_test_utils.getMinCommonStartingOffset(self.systemTestEnv, self.testcaseEnv)
@@ -374,10 +380,19 @@ class ReplicaBasicTest(ReplicationUtils,
# =============================================
# starting debug consumer
# =============================================
- self.log_message("starting debug consumers in the background")
- kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict)
- self.anonLogger.info("sleeping for 10s")
- time.sleep(10)
+ if consumerMultiTopicsMode.lower() == "false":
+ self.log_message("starting debug consumers in the background")
+ kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict)
+ self.anonLogger.info("sleeping for 10s")
+ time.sleep(10)
+
+ # =============================================
+ # starting console consumer
+ # =============================================
+ if logRetentionTest.lower() == "false":
+ self.log_message("starting consumer in the background")
+ kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+ time.sleep(1)
# =============================================
# this testcase is completed - stop all entities
@@ -396,7 +411,7 @@ class ReplicaBasicTest(ReplicationUtils,
# collect logs from remote hosts
# =============================================
kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
-
+
# =============================================
# validate the data matched and checksum
# =============================================
@@ -405,10 +420,13 @@ class ReplicaBasicTest(ReplicationUtils,
if logRetentionTest.lower() == "true":
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+ elif consumerMultiTopicsMode.lower() == "true":
+ kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
else:
- #kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
# =============================================
# draw graphs
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "-1",
"sync":"true",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "-1",
"sync":"false",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "1",
"sync":"true",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "1",
"sync":"false",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "-1",
"sync":"true",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "-1",
"sync":"false",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "1",
"sync":"true",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
"message": "100",
"request-num-acks": "1",
"sync":"false",
- "producer-retry-backoff-ms": "2500",
+ "producer-retry-backoff-ms": "3500",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json?rev=1410547&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json Fri Nov 16 19:38:14 2012
@@ -0,0 +1,107 @@
+{
+ "cluster_config": [
+ {
+ "entity_id": "0",
+ "hostname": "localhost",
+ "role": "zookeeper",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9900"
+ },
+
+
+ {
+ "entity_id": "1",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9901"
+ },
+ {
+ "entity_id": "2",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9902"
+ },
+ {
+ "entity_id": "3",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9903"
+ },
+ {
+ "entity_id": "4",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9904"
+ },
+ {
+ "entity_id": "5",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9905"
+ },
+ {
+ "entity_id": "6",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9906"
+ },
+ {
+ "entity_id": "7",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9907"
+ },
+ {
+ "entity_id": "8",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9908"
+ },
+
+
+ {
+ "entity_id": "9",
+ "hostname": "localhost",
+ "role": "producer_performance",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9909"
+ },
+ {
+ "entity_id": "10",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name": "source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9910"
+ }
+ ]
+}
Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json?rev=1410547&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json Fri Nov 16 19:38:14 2012
@@ -0,0 +1,129 @@
+{
+ "description": {"01":"To Test : 'Leader Failure in Replication'",
+ "02":"Produce and consume messages to 300 topics - 4 partitions.",
+ "03":"This test sends messages to 3 replicas",
+ "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+ "05":"Restart the terminated broker",
+ "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+ "07":"At the end it verifies the log size and contents",
+ "08":"Use a consumer to verify no message loss.",
+ "09":"Producer dimensions : mode:sync, acks:-1, comp:0",
+ "10":"Log segment size : 1048576"
+ },
+ "testcase_args": {
+ "broker_type": "leader",
+ "bounce_broker": "false",
+ "replica_factor": "8",
+ "num_partition": "2",
+ "num_iteration": "1",
+ "producer_multi_topics_mode": "true",
+ "consumer_multi_topics_mode": "true",
+ "sleep_seconds_between_producer_calls": "5",
+ "message_producing_free_time_sec": "15",
+ "num_messages_to_produce_per_producer_call": "50"
+ },
+ "entities": [
+ {
+ "entity_id": "0",
+ "clientPort": "2188",
+ "dataDir": "/tmp/zookeeper_0",
+ "log_filename": "zookeeper_2188.log",
+ "config_filename": "zookeeper_2188.properties"
+ },
+ {
+ "entity_id": "1",
+ "port": "9091",
+ "brokerid": "1",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_1_logs",
+ "log_filename": "kafka_server_9091.log",
+ "config_filename": "kafka_server_9091.properties"
+ },
+ {
+ "entity_id": "2",
+ "port": "9092",
+ "brokerid": "2",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_2_logs",
+ "log_filename": "kafka_server_9092.log",
+ "config_filename": "kafka_server_9092.properties"
+ },
+ {
+ "entity_id": "3",
+ "port": "9093",
+ "brokerid": "3",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_3_logs",
+ "log_filename": "kafka_server_9093.log",
+ "config_filename": "kafka_server_9093.properties"
+ },
+ {
+ "entity_id": "4",
+ "port": "9094",
+ "brokerid": "4",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_4_logs",
+ "log_filename": "kafka_server_9094.log",
+ "config_filename": "kafka_server_9094.properties"
+ },
+ {
+ "entity_id": "5",
+ "port": "9095",
+ "brokerid": "5",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_5_logs",
+ "log_filename": "kafka_server_9095.log",
+ "config_filename": "kafka_server_9095.properties"
+ },
+ {
+ "entity_id": "6",
+ "port": "9096",
+ "brokerid": "6",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_6_logs",
+ "log_filename": "kafka_server_9096.log",
+ "config_filename": "kafka_server_9096.properties"
+ },
+ {
+ "entity_id": "7",
+ "port": "9097",
+ "brokerid": "7",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_7_logs",
+ "log_filename": "kafka_server_9097.log",
+ "config_filename": "kafka_server_9097.properties"
+ },
+ {
+ "entity_id": "8",
+ "port": "9098",
+ "brokerid": "8",
+ "log.file.size": "1048576",
+ "log.dir": "/tmp/kafka_server_8_logs",
+ "log_filename": "kafka_server_9098.log",
+ "config_filename": "kafka_server_9098.properties"
+ },
+ {
+ "entity_id": "9",
+ "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+ "threads": "5",
+ "compression-codec": "0",
+ "message-size": "500",
+ "message": "500",
+ "request-num-acks": "-1",
+ "producer-retry-backoff-ms": "3500",
+ "producer-num-retries": "5",
+ "async":"false",
+ "log_filename": "producer_performance_9.log",
+ "config_filename": "producer_performance_9.properties"
+ },
+ {
+ "entity_id": "10",
+ "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+ "groupid": "mytestgroup",
+ "consumer-timeout-ms": "60000",
+ "zookeeper": "localhost:2188",
+ "log_filename": "console_consumer_10.log",
+ "config_filename": "console_consumer_10.properties"
+ }
+ ]
+}
Modified: incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json (original)
+++ incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json Fri Nov 16 19:38:14 2012
@@ -99,7 +99,9 @@
"testcase_4015",
"testcase_4016",
"testcase_4017",
- "testcase_4018"
+ "testcase_4018",
+
+ "testcase_9051"
],
"MigrationToolTest" : [
Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Fri Nov 16 19:38:14 2012
@@ -996,47 +996,43 @@ def create_topic(systemTestEnv, testcase
prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
for prodPerfCfg in prodPerfCfgList:
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
+ topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh"
- logger.debug("zkEntityId : " + zkEntityId, extra=d)
+ logger.debug("zkEntityId : " + zkEntityId, extra=d)
logger.debug("createTopicBin : " + createTopicBin, extra=d)
+ zkConnectStr = ""
+ topicsList = topicsStr.split(',')
+
if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0:
- logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + "]", extra=d)
+ zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+ elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
+ zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+ else:
+ raise Exception("Empty zkConnectStr found")
+
+ for topic in topicsList:
+ logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d)
cmdList = ["ssh " + zkHost,
"'JAVA_HOME=" + javaHome,
createTopicBin,
" --topic " + topic,
- " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+ " --zookeeper " + zkConnectStr,
" --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"],
" --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"]
-
+
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
- if len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
- logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + "]", extra=d)
- cmdList = ["ssh " + zkHost,
- "'JAVA_HOME=" + javaHome,
- createTopicBin,
- " --topic " + topic,
- " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"],
- " --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"],
- " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
- testcaseEnv.testCaseBaseDir + "/logs/create_target_cluster_topic.log'"]
- cmdStr = " ".join(cmdList)
- logger.debug("executing command: [" + cmdStr + "]", extra=d)
- subproc = system_test_utils.sys_call_return_subproc(cmdStr)
-
-def get_message_id(logPathName):
+def get_message_id(logPathName, topic=""):
logLines = open(logPathName, "r").readlines()
messageIdList = []
@@ -1044,8 +1040,12 @@ def get_message_id(logPathName):
if not "MessageID" in line:
continue
else:
- matchObj = re.match('.*MessageID:(.*?):', line)
- messageIdList.append( matchObj.group(1) )
+ matchObj = re.match('.*Topic:(.*?):.*:MessageID:(.*?):', line)
+ if len(topic) == 0:
+ messageIdList.append( matchObj.group(2) )
+ else:
+ if topic == matchObj.group(1):
+ messageIdList.append( matchObj.group(2) )
return messageIdList
@@ -1949,14 +1949,75 @@ def validate_simple_consumer_data_matche
logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d)
logger.info("topic " + topic + " : non-zero brokers msg count : " + str(nonZeroMsgValue), extra=d)
- if mismatchCounter == 0:
+ if mismatchCounter == 0 and nonZeroMsgCounter > 0:
validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
else:
validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"
+def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv):
+ validationStatusDict = testcaseEnv.validationStatusDict
+ clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+ prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+ consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+
+ for prodPerfCfg in prodPerfCfgList:
+ producerEntityId = prodPerfCfg["entity_id"]
+ topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+ acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
+
+ consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+ matchingConsumerEntityId = None
+ for consumerEntityId in consumerEntityIdList:
+ consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+ if consumerTopic in topicStr:
+ matchingConsumerEntityId = consumerEntityId
+ break
+ if matchingConsumerEntityId is None:
+ break
+
+ producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
+ producerLogPathName = producerLogPath + "/producer_performance.log"
+
+ consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
+ consumerLogPathName = consumerLogPath + "/console_consumer.log"
+
+ topicList = topicStr.split(',')
+ for topic in topicList:
+ msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(
+ testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
+ + "/msg_id_missing_in_consumer_" + topic + ".log"
+ producerMsgIdList = get_message_id(producerLogPathName, topic)
+ consumerMsgIdList = get_message_id(consumerLogPathName, topic)
+ producerMsgIdSet = set(producerMsgIdList)
+ consumerMsgIdSet = set(consumerMsgIdList)
+
+ missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+
+ outfile = open(msgIdMissingInConsumerLogPathName, "w")
+ for id in missingMsgIdInConsumer:
+ outfile.write(id + "\n")
+ outfile.close()
+
+ logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d)
+ logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
+ validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
+ validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
+
+ if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+ validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+ elif (acks == "1"):
+ missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
+ print "#### missing Percent : ", missingPercentage
+ if missingPercentage <= 1:
+ validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+ logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+ else:
+ validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+ logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)