You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/11/16 20:38:16 UTC

svn commit: r1410547 - in /incubator/kafka/branches/0.8/system_test: ./ replication_testsuite/ replication_testsuite/testcase_0151/ replication_testsuite/testcase_0152/ replication_testsuite/testcase_0153/ replication_testsuite/testcase_0154/ replicati...

Author: junrao
Date: Fri Nov 16 19:38:14 2012
New Revision: 1410547

URL: http://svn.apache.org/viewvc?rev=1410547&view=rev
Log:
System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments; patched by John Fung; reviewed by Jun Rao; KAFKA-605

Added:
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
Modified:
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json
    incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Fri Nov 16 19:38:14 2012
@@ -123,6 +123,12 @@ class ReplicaBasicTest(ReplicationUtils,
                     logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"]
                 except:
                     pass
+                consumerMultiTopicsMode = "false"
+                try:
+                    consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"]
+                except:
+                    pass
+
 
                 # initialize self.testcaseEnv with user-defined environment variables (product specific)
                 self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
@@ -363,8 +369,8 @@ class ReplicaBasicTest(ReplicationUtils,
                 # =============================================
                 minStartingOffsetDict = None
                 if logRetentionTest.lower() == "true":
-                    self.anonLogger.info("sleeping for 10s before collecting logs")
-                    time.sleep(10)
+                    self.anonLogger.info("sleeping for 60s to make sure log truncation is completed")
+                    time.sleep(60)
                     kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 
                     minStartingOffsetDict = kafka_system_test_utils.getMinCommonStartingOffset(self.systemTestEnv, self.testcaseEnv)
@@ -374,10 +380,19 @@ class ReplicaBasicTest(ReplicationUtils,
                 # =============================================
                 # starting debug consumer
                 # =============================================
-                self.log_message("starting debug consumers in the background")
-                kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict)
-                self.anonLogger.info("sleeping for 10s")
-                time.sleep(10)
+                if consumerMultiTopicsMode.lower() == "false":
+                    self.log_message("starting debug consumers in the background")
+                    kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict)
+                    self.anonLogger.info("sleeping for 10s")
+                    time.sleep(10)
+
+                # =============================================
+                # starting console consumer
+                # =============================================
+                if logRetentionTest.lower() == "false":
+                    self.log_message("starting consumer in the background")
+                    kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+                    time.sleep(1)
                     
                 # =============================================
                 # this testcase is completed - stop all entities
@@ -396,7 +411,7 @@ class ReplicaBasicTest(ReplicationUtils,
                 # collect logs from remote hosts
                 # =============================================
                 kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
-    
+
                 # =============================================
                 # validate the data matched and checksum
                 # =============================================
@@ -405,10 +420,13 @@ class ReplicaBasicTest(ReplicationUtils,
                 if logRetentionTest.lower() == "true":
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+                elif consumerMultiTopicsMode.lower() == "true":
+                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
                 else:
-                    #kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
 
                 # =============================================
                 # draw graphs

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0151/testcase_0151_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "-1",
       "sync":"true",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0152/testcase_0152_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "-1",
       "sync":"false",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0153/testcase_0153_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "1",
       "sync":"true",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0154/testcase_0154_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "1",
       "sync":"false",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0155/testcase_0155_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "-1",
       "sync":"true",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0156/testcase_0156_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "-1",
       "sync":"false",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0157/testcase_0157_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "1",
       "sync":"true",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0158/testcase_0158_properties.json Fri Nov 16 19:38:14 2012
@@ -65,7 +65,7 @@
       "message": "100",
       "request-num-acks": "1",
       "sync":"false",
-      "producer-retry-backoff-ms": "2500",
+      "producer-retry-backoff-ms": "3500",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json?rev=1410547&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json Fri Nov 16 19:38:14 2012
@@ -0,0 +1,107 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9900"
+        },
+
+
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9901"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9902"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9903"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9904"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9905"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9906"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9907"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9908"
+        },
+
+
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9909"
+        },
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9910"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json?rev=1410547&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json Fri Nov 16 19:38:14 2012
@@ -0,0 +1,129 @@
+{
+  "description": {"01":"To Test : 'Leader Failure in Replication'",
+                  "02":"Produce and consume messages to 300 topics - 4 partitions.",
+                  "03":"This test sends messages to 3 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "10":"Log segment size    : 1048576"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "false",
+    "replica_factor": "8",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "producer_multi_topics_mode": "true",
+    "consumer_multi_topics_mode": "true",
+    "sleep_seconds_between_producer_calls": "5",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "port": "9094",
+      "brokerid": "4",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "log_filename": "kafka_server_9094.log",
+      "config_filename": "kafka_server_9094.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9095",
+      "brokerid": "5",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "log_filename": "kafka_server_9095.log",
+      "config_filename": "kafka_server_9095.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9096",
+      "brokerid": "6",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "log_filename": "kafka_server_9096.log",
+      "config_filename": "kafka_server_9096.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9097",
+      "brokerid": "7",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "log_filename": "kafka_server_9097.log",
+      "config_filename": "kafka_server_9097.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9098",
+      "brokerid": "8",
+      "log.file.size": "1048576",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "log_filename": "kafka_server_9098.log",
+      "config_filename": "kafka_server_9098.properties"
+    },
+    {
+      "entity_id": "9",
+      "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "-1",
+      "producer-retry-backoff-ms": "3500",
+      "producer-num-retries": "5",
+      "async":"false",
+      "log_filename": "producer_performance_9.log",
+      "config_filename": "producer_performance_9.properties"
+    },
+    {
+      "entity_id": "10",
+      "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "zookeeper": "localhost:2188",
+      "log_filename": "console_consumer_10.log",
+      "config_filename": "console_consumer_10.properties"
+    }
+  ]
+}

Modified: incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json (original)
+++ incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json Fri Nov 16 19:38:14 2012
@@ -99,7 +99,9 @@
         "testcase_4015",
         "testcase_4016",
         "testcase_4017",
-        "testcase_4018"
+        "testcase_4018",
+
+        "testcase_9051"
     ],
 
     "MigrationToolTest"  : [

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1410547&r1=1410546&r2=1410547&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Fri Nov 16 19:38:14 2012
@@ -996,47 +996,43 @@ def create_topic(systemTestEnv, testcase
     prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
 
     for prodPerfCfg in prodPerfCfgList:
-        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
+        topicsStr       = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
         zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
         zkHost          = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
         kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
         javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
         createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"
 
-        logger.debug("zkEntityId : " + zkEntityId, extra=d)
+        logger.debug("zkEntityId     : " + zkEntityId, extra=d)
         logger.debug("createTopicBin : " + createTopicBin, extra=d)
 
+        zkConnectStr = ""
+        topicsList   = topicsStr.split(',')
+
         if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0:
-            logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + "]", extra=d) 
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+        elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+        else:
+            raise Exception("Empty zkConnectStr found")
+
+        for topic in topicsList:
+            logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) 
             cmdList = ["ssh " + zkHost,
                        "'JAVA_HOME=" + javaHome,
                        createTopicBin,
                        " --topic "     + topic,
-                       " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                       " --zookeeper " + zkConnectStr,
                        " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
                        " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
                        testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"]
-
+    
             cmdStr = " ".join(cmdList)
             logger.debug("executing command: [" + cmdStr + "]", extra=d)
             subproc = system_test_utils.sys_call_return_subproc(cmdStr)
 
-        if len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
-            logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + "]", extra=d) 
-            cmdList = ["ssh " + zkHost,
-                       "'JAVA_HOME=" + javaHome,
-                       createTopicBin,
-                       " --topic "     + topic,
-                       " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"],
-                       " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
-                       " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
-                       testcaseEnv.testCaseBaseDir + "/logs/create_target_cluster_topic.log'"]
 
-            cmdStr = " ".join(cmdList)
-            logger.debug("executing command: [" + cmdStr + "]", extra=d)
-            subproc = system_test_utils.sys_call_return_subproc(cmdStr)
-
-def get_message_id(logPathName):
+def get_message_id(logPathName, topic=""):
     logLines      = open(logPathName, "r").readlines()
     messageIdList = []
 
@@ -1044,8 +1040,12 @@ def get_message_id(logPathName):
         if not "MessageID" in line:
             continue
         else:
-            matchObj = re.match('.*MessageID:(.*?):', line)
-            messageIdList.append( matchObj.group(1) )
+            matchObj = re.match('.*Topic:(.*?):.*:MessageID:(.*?):', line)
+            if len(topic) == 0:
+                messageIdList.append( matchObj.group(2) )
+            else:
+                if topic == matchObj.group(1):
+                    messageIdList.append( matchObj.group(2) )
 
     return messageIdList
 
@@ -1949,14 +1949,75 @@ def validate_simple_consumer_data_matche
             logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d)
             logger.info("topic " + topic + " : non-zero brokers msg count             : " + str(nonZeroMsgValue), extra=d)
 
-        if mismatchCounter == 0:
+        if mismatchCounter == 0 and nonZeroMsgCounter > 0:
             validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
         else:
             validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"
 
 
+def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv):
+    validationStatusDict        = testcaseEnv.validationStatusDict
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+
+    for prodPerfCfg in prodPerfCfgList:
+        producerEntityId = prodPerfCfg["entity_id"]
+        topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+        acks     = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
+
+        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
 
+        matchingConsumerEntityId = None
+        for consumerEntityId in consumerEntityIdList:
+            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            if consumerTopic in topicStr:
+                matchingConsumerEntityId = consumerEntityId
+                break
 
+        if matchingConsumerEntityId is None:
+            break
+
+        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
+        producerLogPathName = producerLogPath + "/producer_performance.log"
+
+        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
+        consumerLogPathName = consumerLogPath + "/console_consumer.log"
+
+        topicList = topicStr.split(',')
+        for topic in topicList:
+            msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
+                                                testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
+                                                + "/msg_id_missing_in_consumer_" + topic + ".log"
+            producerMsgIdList  = get_message_id(producerLogPathName, topic)
+            consumerMsgIdList  = get_message_id(consumerLogPathName, topic)
+            producerMsgIdSet   = set(producerMsgIdList)
+            consumerMsgIdSet   = set(consumerMsgIdList)
+
+            missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+
+            outfile = open(msgIdMissingInConsumerLogPathName, "w")
+            for id in missingMsgIdInConsumer:
+                outfile.write(id + "\n")
+            outfile.close()
+
+            logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
+            logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
+            validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
+            validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
+
+            if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+                validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+            elif (acks == "1"):
+                missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
+                print "#### missing Percent : ", missingPercentage
+                if missingPercentage <= 1:
+                    validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+                    logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+            else:
+                validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+                logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)