You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/10 20:05:54 UTC

svn commit: r1359812 - in /incubator/kafka/branches/0.8/system_test/broker_failure: ./ bin/ config/

Author: junrao
Date: Tue Jul 10 18:05:54 2012
New Revision: 1359812

URL: http://svn.apache.org/viewvc?rev=1359812&view=rev
Log:
broker failure system test broken on replication branch; patched by John Fung; reviewed by Joel Koshy and Jun Rao; KAFKA-306

Modified:
    incubator/kafka/branches/0.8/system_test/broker_failure/README
    incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/README
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/README?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/README (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/README Tue Jul 10 18:05:54 2012
@@ -1,23 +1,34 @@
-This script performs broker failure tests with the following
-setup in a single local machine:
-
-1. A cluster of Kafka source brokers
-2. A cluster of Kafka mirror brokers with embedded consumers in 
-   point-to-point mode
-3. An independent ConsoleConsumer in publish/subcribe mode to
+** Please note that the following commands should be executed
+   after downloading the kafka source code to build all the
+   required binaries:
+   1. <kafka install dir>/ $ ./sbt update
+   2. <kafka install dir>/ $ ./sbt package
+
+   Now you are ready to follow the steps below.
+
+This script performs broker failure tests in an environment with
+Mirrored Source & Target clusters in a single machine:
+
+1. Start a cluster of Kafka source brokers
+2. Start a cluster of Kafka target brokers
+3. Start one or more Mirror Maker to create mirroring between
+   source and target clusters
+4. A producer produces batches of messages to the SOURCE brokers
+   in the background
+5. The Kafka SOURCE, TARGET brokers and Mirror Maker will be
+   terminated in a round-robin fashion and wait for the consumer
+   to catch up.
+6. Repeat step 5 as many times as specified in the script
+7. An independent ConsoleConsumer in publish/subcribe mode to
    consume messages from the SOURCE brokers cluster
-4. An independent ConsoleConsumer in publish/subcribe mode to
-   consume messages from the MIRROR brokers cluster
-5. A producer produces batches of messages to the SOURCE brokers
-6. One of the Kafka SOURCE or MIRROR brokers in the cluster will
-   be randomly terminated and waiting for the consumer to catch up.
-7. Repeat Step 4 & 5 as many times as specified in the script
+8. An independent ConsoleConsumer in publish/subcribe mode to
+   consume messages from the TARGET brokers cluster
 
 Expected results:
 ==================
 There should not be any discrepancies by comparing the unique 
 message checksums from the source ConsoleConsumer and the 
-mirror ConsoleConsumer.
+target ConsoleConsumer.
 
 Notes:
 ==================
@@ -26,17 +37,36 @@ The number of Kafka SOURCE brokers can b
 2. Make sure that there are corresponding number of prop files:
    $base_dir/config/server_source{1..4}.properties
 
-The number of Kafka MIRROR brokers can be increased as follows:
+The number of Kafka TARGET brokers can be increased as follows:
 1. Update the value of $num_kafka_target_server in this script
 2. Make sure that there are corresponding number of prop files:
    $base_dir/config/server_target{1..3}.properties
 
 Quick Start:
 ==================
-Execute this script as follows:
-  <kafka home>/system_test/broker_failure $ bin/run-test.sh
+In the directory <kafka home>/system_test/broker_failure,
+execute this script as following:
+  $ bin/run-test.sh -n <num of iterations> -s <servers to bounce>
+
+num of iterations - the number of iterations that the test runs
+
+servers to bounce - the servers to be bounced in a round-robin fashion.
+
+    Values to be entered:
+        1 - source broker
+        2 - mirror maker
+        3 - target broker
+
+    Example:
+        * To bounce only mirror maker and target broker
+          in turns, enter the value 23.
+        * To bounce only mirror maker, enter the value 2.
+        * To run the test without bouncing, enter 0.
+
+At the end of the test, the received messages checksums in both
+SOURCE & TARGET will be compared. If all checksums are matched,
+the test is PASSED. Otherwise, the test is FAILED.
 
 In the event of failure, by default the brokers and zookeepers
 remain running to make it easier to debug the issue - hit Ctrl-C
 to shut them down. 
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh Tue Jul 10 18:05:54 2012
@@ -14,151 +14,134 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# =================================================================
+# ===========
 # run-test.sh
 # ===========
-# 
-# This script performs broker failure tests with the following
-# setup in a single local machine:
-# 
-# 1. A cluster of Kafka source brokers
-# 2. A cluster of Kafka mirror brokers with embedded consumers in 
-#    point-to-point mode
-# 3. An independent ConsoleConsumer in publish/subcribe mode to
-#    consume messages from the SOURCE brokers cluster
-# 4. An independent ConsoleConsumer in publish/subcribe mode to
-#    consume messages from the MIRROR brokers cluster
-# 5. A producer produces batches of messages to the SOURCE brokers
-# 6. One of the Kafka SOURCE or MIRROR brokers in the cluster will
-#    be randomly terminated and waiting for the consumer to catch up.
-# 7. Repeat Step 4 & 5 as many times as specified in the script
-# 
-# Expected results:
-# ==================
-# There should not be any discrepancies by comparing the unique 
-# message checksums from the source ConsoleConsumer and the 
-# mirror ConsoleConsumer.
-# 
-# Notes:
-# ==================
-# The number of Kafka SOURCE brokers can be increased as follows:
-# 1. Update the value of $num_kafka_source_server in this script
-# 2. Make sure that there are corresponding number of prop files:
-#    $base_dir/config/server_source{1..4}.properties
-# 
-# The number of Kafka TARGET brokers can be increased as follows:
-# 1. Update the value of $num_kafka_target_server in this script
-# 2. Make sure that there are corresponding number of prop files:
-#    $base_dir/config/server_target{1..3}.properties
-# 
-# Quick Start:
-# ==================
-# Execute this script as follows:
-#   <kafka home>/system_test/broker_failure $ bin/run-test.sh
-# 
-# The expected output is given in bin/expected.out.
-# 
-# In the event of failure, by default the brokers and zookeepers
-# remain running to make it easier to debug the issue - hit Ctrl-C
-# to shut them down. 
-# =================================================================
-
-readonly base_dir=$(dirname $0)/..
-readonly test_start_time="$(date +%s)"
-
-readonly num_msg_per_batch=500
-readonly batches_per_iteration=5
-readonly num_iterations=5
-
-readonly zk_source_port=2181
-readonly zk_mirror_port=2182
-
-readonly topic_prefix=test
-readonly max_topic_id=2
-readonly unbalanced_start_id=2
-readonly consumer_grp=group1
+ 
+# ====================================
+# Do not change the followings
+# (keep this section at the beginning
+# of this script)
+# ====================================
+readonly system_test_root=$(dirname $0)/../..        # path of <kafka install>/system_test
+readonly common_dir=${system_test_root}/common       # common util scripts for system_test
+source   ${common_dir}/util.sh                       # include the util script
+
+readonly base_dir=$(dirname $0)/..                   # the base dir of this test suite
+readonly test_start_time="$(date +%s)"               # time starting this test
+readonly bounce_source_id=1
+readonly bounce_mir_mkr_id=2
+readonly bounce_target_id=3
+readonly log4j_prop_file=$base_dir/config/log4j.properties
+
+iter=1                                               # init a counter to keep track of iterations
+num_iterations=5                                     # total no. of iterations to run
+svr_to_bounce=0                                      # servers to bounce: 1-source 2-mirror_maker 3-target
+                                                     # 12 - source & mirror_maker
+                                                     # 13 - source & target
+
+# ====================================
+# No need to change the following
+# configurations in most cases
+# ====================================
+readonly zk_source_port=2181                         # source zk port
+readonly zk_target_port=2182                         # target zk port
+readonly test_topic=test01                           # topic used in this test
+readonly consumer_grp=group1                         # consumer group
 readonly source_console_consumer_grp=source
-readonly mirror_console_consumer_grp=mirror
-readonly message_size=5000
-
-# sleep time between each batch of messages published
-# from producer - it will be randomly generated
-# within the range of sleep_min & sleep_max
-readonly sleep_min=3
-readonly sleep_max=3
-
-# requires same no. of property files such as:
-# $base_dir/config/server_source{1..4}.properties
-readonly num_kafka_source_server=4
-
-# requires same no. of property files such as:
-# $base_dir/config/server_target{1..3}.properties
-readonly num_kafka_target_server=3
-
-readonly wait_time_after_killing_broker=0
-readonly wait_time_after_restarting_broker=5
-
-readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094"
-readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093"
-
-background_producer_pid_1=
-background_producer_pid_2=
-
-no_bouncing=$#
-
-iter=1
-abort_test=false
-
+readonly target_console_consumer_grp=target
+readonly message_size=100
+readonly console_consumer_timeout_ms=15000
+readonly num_kafka_source_server=4                   # requires same no. of property files such as: 
+                                                     # $base_dir/config/server_source{1..4}.properties
+readonly num_kafka_target_server=3                   # requires same no. of property files such as: 
+                                                     # $base_dir/config/server_target{1..3}.properties
+readonly num_kafka_mirror_maker=3                    # any values greater than 0
+readonly wait_time_after_killing_broker=0            # wait after broker is stopped but before starting again
+readonly wait_time_after_restarting_broker=10
+
+# ====================================
+# Change the followings as needed
+# ====================================
+num_msg_per_batch=500                                # no. of msg produced in each calling of ProducerPerformance
+num_producer_threads=5                               # no. of producer threads to send msg
+producer_sleep_min=5                                 # min & max sleep time (in sec) between each 
+producer_sleep_max=5                                 # batch of messages sent from producer 
+
+# ====================================
+# zookeeper
+# ====================================
 pid_zk_source=
 pid_zk_target=
+zk_log4j_log=
 
+# ====================================
+# kafka source
+# ====================================
 kafka_source_pids=
 kafka_source_prop_files=
 kafka_source_log_files=
+kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log
+kafka_log4j_log=
 
+# ====================================
+# kafka target
+# ====================================
 kafka_target_pids=
 kafka_target_prop_files=
 kafka_target_log_files=
+
+# ====================================
+# mirror maker
+# ====================================
+kafka_mirror_maker_pids=
+kafka_mirror_maker_log_files=
+consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
 mirror_producer_prop_files=
 
+# ====================================
+# console consumer source
+# ====================================
 console_consumer_source_pid=
-console_consumer_mirror_pid=
-
 console_consumer_source_log=$base_dir/console_consumer_source.log
-console_consumer_mirror_log=$base_dir/console_consumer_mirror.log
+console_consumer_source_mid_log=$base_dir/console_consumer_source_mid.log
+console_consumer_source_mid_sorted_log=$base_dir/console_consumer_source_mid_sorted.log
+console_consumer_source_mid_sorted_uniq_log=$base_dir/console_consumer_source_mid_sorted_uniq.log
+
+# ====================================
+# console consumer target
+# ====================================
+console_consumer_target_pid=
+console_consumer_target_log=$base_dir/console_consumer_target.log
+console_consumer_target_mid_log=$base_dir/console_consumer_target_mid.log
+console_consumer_target_mid_sorted_log=$base_dir/console_consumer_target_mid_sorted.log
+console_consumer_target_mid_sorted_uniq_log=$base_dir/console_consumer_target_mid_sorted_uniq.log
+
+# ====================================
+# producer
+# ====================================
+background_producer_pid=
 producer_performance_log=$base_dir/producer_performance.log
-
-console_consumer_source_crc_log=$base_dir/console_consumer_source_crc.log
-console_consumer_source_crc_sorted_log=$base_dir/console_consumer_source_crc_sorted.log
-console_consumer_source_crc_sorted_uniq_log=$base_dir/console_consumer_source_crc_sorted_uniq.log
-
-console_consumer_mirror_crc_log=$base_dir/console_consumer_mirror_crc.log
-console_consumer_mirror_crc_sorted_log=$base_dir/console_consumer_mirror_crc_sorted.log
-console_consumer_mirror_crc_sorted_uniq_log=$base_dir/console_consumer_mirror_crc_sorted_uniq.log
-
-producer_performance_crc_log=$base_dir/producer_performance_crc.log
-producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log
-producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log
-
-consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log
-
-consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
+producer_performance_mid_log=$base_dir/producer_performance_mid.log
+producer_performance_mid_sorted_log=$base_dir/producer_performance_mid_sorted.log
+producer_performance_mid_sorted_uniq_log=$base_dir/producer_performance_mid_sorted_uniq.log
+tmp_file_to_stop_background_producer=/tmp/tmp_file_to_stop_background_producer
+
+# ====================================
+# test reports
+# ====================================
 checksum_diff_log=$base_dir/checksum_diff.log
 
-info() {
-    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
-
-info_no_newline() {
-    echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
 
+# ====================================
+# initialize prop and log files
+# ====================================
 initialize() {
     for ((i=1; i<=$num_kafka_target_server; i++))
     do
         kafka_target_prop_files[${i}]=$base_dir/config/server_target${i}.properties
         kafka_target_log_files[${i}]=$base_dir/kafka_target${i}.log
-        mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties
+        kafka_mirror_maker_log_files[${i}]=$base_dir/kafka_mirror_maker${i}.log
     done
 
     for ((i=1; i<=$num_kafka_source_server; i++))
@@ -166,109 +149,69 @@ initialize() {
         kafka_source_prop_files[${i}]=$base_dir/config/server_source${i}.properties
         kafka_source_log_files[${i}]=$base_dir/kafka_source${i}.log
     done
-}
 
-# =========================================
-# get_random_range - return a random number
-#     between the lower & upper bounds
-# usage:
-#     get_random_range $lower $upper
-#     random_no=$?
-# =========================================
-get_random_range() {
-    lo=$1
-    up=$2
-    range=$(($up - $lo + 1))
-
-    return $(($(($RANDOM % range)) + $lo))
-}
-
-verify_consumer_rebalancing() {
-
-    info "Verifying consumer rebalancing operation"
-
-    CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \
-                                 kafka.tools.VerifyConsumerRebalance \
-                                 --zk.connect=localhost:2181 \
-                                 --group $consumer_grp`
-    echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log
-
-    REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1`
-    # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE"
-    REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"`
-    info "REBALANCE_STATUS: $REBALANCE_STATUS"
-
-    if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then
-        info "setting abort_test to true due to Rebalance operation failed"
-        abort_test="true"
-    fi
+    for ((i=1; i<=$num_kafka_mirror_maker; i++))
+    do
+        mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties
+    done
+
+    zk_log4j_log=`grep "log4j.appender.zookeeperAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'`
+    kafka_log4j_log=`grep "log4j.appender.kafkaAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'`
 }
 
-wait_for_zero_consumer_lags() {
+# =========================================
+# cleanup
+# =========================================
+cleanup() {
+    info "cleaning up"
 
-    topic_id=$1
+    rm -rf $tmp_file_to_stop_background_producer
+    rm -rf $kafka_topic_creation_log_file
 
-    # no of times to check for zero lagging
-    no_of_zero_to_verify=3
+    rm -rf /tmp/zookeeper_source
+    rm -rf /tmp/zookeeper_target
 
-    while [ 'x' == 'x' ]
-    do
-        TOTAL_LAG=0
-        CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
-                       --group $consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+    rm -rf /tmp/kafka-source{1..4}-logs
+    rm -rf /tmp/kafka-target{1..3}-logs
 
-        for lag in $CONSUMER_LAGS;
-        do
-            TOTAL_LAG=$(($TOTAL_LAG + $lag))
-        done
+    rm -rf $zk_log4j_log
+    rm -rf $kafka_log4j_log
 
-        info "mirror TOTAL_LAG = $TOTAL_LAG"
-        if [ $TOTAL_LAG -eq 0 ]; then
-            if [ $no_of_zero_to_verify -eq 0 ]; then
-                echo
-                return 0
-            fi
-            no_of_zero_to_verify=$(($no_of_zero_to_verify - 1))
-        fi
-        sleep 1
+    for ((i=1; i<=$num_kafka_target_server; i++))
+    do
+        rm -rf ${kafka_target_log_files[${i}]}
+        rm -rf ${kafka_mirror_maker_log_files[${i}]}
     done
-}
-
-wait_for_zero_source_console_consumer_lags() {
 
-    topic_id=$1
+    rm -f $base_dir/zookeeper_source.log
+    rm -f $base_dir/zookeeper_target.log
+    rm -f $base_dir/kafka_source{1..4}.log
 
-    # no of times to check for zero lagging
-    no_of_zero_to_verify=3
+    rm -f $producer_performance_log
+    rm -f $producer_performance_mid_log
+    rm -f $producer_performance_mid_sorted_log
+    rm -f $producer_performance_mid_sorted_uniq_log
 
-    while [ 'x' == 'x' ]
-    do
-        TOTAL_LAG=0
-        CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
-                       --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+    rm -f $console_consumer_target_log
+    rm -f $console_consumer_source_log
+    rm -f $console_consumer_target_mid_log
+    rm -f $console_consumer_source_mid_log
 
-        for lag in $CONSUMER_LAGS;
-        do
-            TOTAL_LAG=$(($TOTAL_LAG + $lag))
-        done
+    rm -f $checksum_diff_log
 
-        info "source console consumer TOTAL_LAG = $TOTAL_LAG"
-        if [ $TOTAL_LAG -eq 0 ]; then
-            if [ $no_of_zero_to_verify -eq 0 ]; then
-                echo
-                return 0
-            fi
-            no_of_zero_to_verify=$(($no_of_zero_to_verify - 1))
-        fi
-        sleep 1
-    done
+    rm -f $console_consumer_target_mid_sorted_log
+    rm -f $console_consumer_source_mid_sorted_log
+    rm -f $console_consumer_target_mid_sorted_uniq_log
+    rm -f $console_consumer_source_mid_sorted_uniq_log
 }
 
-wait_for_zero_mirror_console_consumer_lags() {
+# =========================================
+# wait_for_zero_consumer_lags
+# =========================================
+wait_for_zero_consumer_lags() {
 
-    topic_id=$1
+    this_group_name=$1
+    this_zk_port=$2
 
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
@@ -277,8 +220,10 @@ wait_for_zero_mirror_console_consumer_la
     do
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
-                       --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \
-                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --group $target_console_consumer_grp \
+                       --zkconnect localhost:$zk_target_port \
+                       --topic $test_topic \
+                       | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -297,58 +242,25 @@ wait_for_zero_mirror_console_consumer_la
     done
 }
 
-kill_child_processes() {
-    isTopmost=$1
-    curPid=$2
-    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
-
-    for childPid in $childPids
-    do
-        kill_child_processes 0 $childPid
-    done
-    if [ $isTopmost -eq 0 ]; then
-        kill -15 $curPid 2> /dev/null
-    fi
-}
-
-cleanup() {
-    info "cleaning up"
-
-    rm -rf /tmp/zookeeper_source
-    rm -rf /tmp/zookeeper_target
-
-    rm -rf /tmp/kafka-source{1..4}-logs
-    rm -rf /tmp/kafka-target{1..3}-logs
-
-    for ((i=1; i<=$num_kafka_target_server; i++))
-    do
-        rm -rf ${kafka_target_log_files[${i}]}
-    done
-
-    rm -f $base_dir/zookeeper_source.log
-    rm -f $base_dir/zookeeper_target.log
-    rm -f $base_dir/kafka_source{1..4}.log
-
-    rm -f $producer_performance_log
-    rm -f $producer_performance_crc_log
-    rm -f $producer_performance_crc_sorted_log
-    rm -f $producer_performance_crc_sorted_uniq_log
-
-    rm -f $console_consumer_mirror_log
-    rm -f $console_consumer_source_log
-    rm -f $console_consumer_mirror_crc_log
-    rm -f $console_consumer_source_crc_log
-
-    rm -f $checksum_diff_log
-
-    rm -f $console_consumer_mirror_crc_sorted_log
-    rm -f $console_consumer_source_crc_sorted_log
-    rm -f $console_consumer_mirror_crc_sorted_uniq_log
-    rm -f $console_consumer_source_crc_sorted_uniq_log
-
-    rm -f $consumer_rebalancing_log
+# =========================================
+# create_topic
+# =========================================
+create_topic() {
+    this_topic_to_create=$1
+    this_zk_conn_str=$2
+    this_replica_factor=$3
+
+    info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]"
+    $base_dir/../../bin/kafka-create-topic.sh \
+        --topic $this_topic_to_create \
+        --zookeeper $this_zk_conn_str \
+        --replica $this_replica_factor \
+        2> $kafka_topic_creation_log_file 
 }
 
+# =========================================
+# start_zk
+# =========================================
 start_zk() {
     info "starting zookeepers"
 
@@ -363,6 +275,9 @@ start_zk() {
     pid_zk_target=$!
 }
 
+# =========================================
+# start_source_servers_cluster
+# =========================================
 start_source_servers_cluster() {
     info "starting source cluster"
 
@@ -372,112 +287,130 @@ start_source_servers_cluster() {
     done
 }
 
+# =========================================
+# start_source_server
+# =========================================
 start_source_server() {
     s_idx=$1
 
     $base_dir/bin/kafka-run-class.sh kafka.Kafka \
         ${kafka_source_prop_files[$s_idx]} \
-        2>&1 >> ${kafka_source_log_files[$s_idx]} &    # append log msg after restarting
+        2>&1 >> ${kafka_source_log_files[$s_idx]} &
     kafka_source_pids[${s_idx}]=$!
 
     info "  -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}"
 }
 
+# =========================================
+# start_target_servers_cluster
+# =========================================
 start_target_servers_cluster() {
     info "starting mirror cluster"
 
     for ((i=1; i<=$num_kafka_target_server; i++))
     do
-        start_embedded_consumer_server $i
+        start_target_server $i
     done
 }
 
-start_embedded_consumer_server() {
+# =========================================
+# start_target_server
+# =========================================
+start_target_server() {
     s_idx=$1
 
     $base_dir/bin/kafka-run-class.sh kafka.Kafka \
         ${kafka_target_prop_files[${s_idx}]} \
-        $consumer_prop_file \
-        ${mirror_producer_prop_files[${s_idx}]} \
-        2>&1 >> ${kafka_target_log_files[${s_idx}]} &    # append log msg after restarting
+        2>&1 >> ${kafka_target_log_files[${s_idx}]} &
     kafka_target_pids[$s_idx]=$!
 
     info "  -> kafka_target_pids[$s_idx]: ${kafka_target_pids[$s_idx]}"
 }
 
-start_console_consumer_for_source_producer() {
+# =========================================
+# start_target_mirror_maker
+# =========================================
+start_target_mirror_maker() {
+    info "starting mirror maker"
 
-    topic_id=$1
+    for ((i=1; i<=$num_kafka_mirror_maker; i++))
+    do
+        start_mirror_maker $i
+    done
+}
 
-    info "starting console consumers for source producer on topic id [$topic_id]"
+# =========================================
+# start_mirror_maker
+# =========================================
+start_mirror_maker() {
+    s_idx=$1
 
-    $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
-        --zookeeper localhost:$zk_source_port \
-        --topic ${topic_prefix}_${topic_id} \
-        --group $source_console_consumer_grp \
-        --from-beginning --consumer-timeout-ms 5000 \
-        --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        --property topic=${topic_prefix}_${topic_id} \
-        2>&1 >> ${console_consumer_source_log} 
+    $base_dir/bin/kafka-run-class.sh kafka.tools.MirrorMaker \
+        --consumer.config $consumer_prop_file \
+        --producer.config ${mirror_producer_prop_files[${s_idx}]} \
+        --whitelist=\".*\" \
+        2>&1 >> ${kafka_mirror_maker_log_files[$s_idx]} &
+    kafka_mirror_maker_pids[${s_idx}]=$!
+
+    info "  -> kafka_mirror_maker_pids[$s_idx]: ${kafka_mirror_maker_pids[$s_idx]}"
 }
 
-start_console_consumer_for_mirror_producer() {
+# =========================================
+# start_console_consumer
+# =========================================
+start_console_consumer() {
 
-    topic_id=$1
+    this_consumer_grp=$1
+    this_consumer_zk_port=$2
+    this_consumer_log=$3
+    this_msg_formatter=$4
 
-    info "starting console consumers for mirroring producer on topic id [$topic_id]"
+    info "starting console consumers for $this_consumer_grp"
 
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
-        --zookeeper localhost:$zk_mirror_port \
-        --topic ${topic_prefix}_${topic_id} \
-        --group $mirror_console_consumer_grp \
-        --from-beginning --consumer-timeout-ms 5000 \
-        --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        --property topic=${topic_prefix}_${topic_id} \
-        2>&1 >> ${console_consumer_mirror_log} 
-}
+        --zookeeper localhost:$this_consumer_zk_port \
+        --topic $test_topic \
+        --group $this_consumer_grp \
+        --from-beginning \
+        --consumer-timeout-ms $console_consumer_timeout_ms \
+        --formatter "kafka.consumer.ConsoleConsumer\$${this_msg_formatter}" \
+        2>&1 > ${this_consumer_log} &
+    console_consumer_pid=$!
 
-consume_source_producer_messages() {
-    consumer_counter=1
-    while [ $consumer_counter -le $max_topic_id ]
-    do
-        start_console_consumer_for_source_producer $consumer_counter
-        consumer_counter=$(( $consumer_counter + 1 ))
-    done
+    info "  -> console consumer pid: $console_consumer_pid"
 }
 
-consume_mirror_producer_messages() {
-    consumer_counter=1
-    while [ $consumer_counter -le $max_topic_id ]
-    do
-        start_console_consumer_for_mirror_producer $consumer_counter
-        consumer_counter=$(( $consumer_counter + 1 ))
-    done
+# =========================================
+# force_shutdown_background_producer
+# - to be called when user press Ctrl-C
+# =========================================
+force_shutdown_background_producer() {
+    info "force shutting down producer"
+    `ps auxw | grep "run\-test\|ProducerPerformance" | grep -v grep | awk '{print $2}' | xargs kill -9`
 }
 
-shutdown_producer() {
-    info "shutting down producer"
-    if [ "x${background_producer_pid_1}" != "x" ]; then
-        # kill_child_processes 0 ${background_producer_pid_1};
-        kill -TERM ${background_producer_pid_1} 2> /dev/null;
-    fi
-
-    if [ "x${background_producer_pid_2}" != "x" ]; then
-        # kill_child_processes 0 ${background_producer_pid_2};
-        kill -TERM ${background_producer_pid_2} 2> /dev/null;
-    fi
+# =========================================
+# force_shutdown_consumer
+# - to be called when user press Ctrl-C
+# =========================================
+force_shutdown_consumer() {
+    info "force shutting down consumer"
+    `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9`
 }
 
+# =========================================
+# shutdown_servers
+# =========================================
 shutdown_servers() {
-    info "shutting down mirror console consumer"
-    if [ "x${console_consumer_mirror_pid}" != "x" ]; then 
-        kill_child_processes 0 ${console_consumer_mirror_pid};
-    fi
 
-    info "shutting down source console consumer"
-    if [ "x${console_consumer_source_pid}" != "x" ]; then 
-        kill_child_processes 0 ${console_consumer_source_pid};
-    fi
+    info "shutting down mirror makers"
+    for ((i=1; i<=$num_kafka_mirror_maker; i++))
+    do
+        #info "stopping mm pid: ${kafka_mirror_maker_pids[$i]}"
+        if [ "x${kafka_mirror_maker_pids[$i]}" != "x" ]; then
+            kill_child_processes 0 ${kafka_mirror_maker_pids[$i]};
+        fi
+    done
 
     info "shutting down target servers"
     for ((i=1; i<=$num_kafka_target_server; i++))
@@ -500,84 +433,79 @@ shutdown_servers() {
     if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
 }
 
+# =========================================
+# start_background_producer
+# =========================================
 start_background_producer() {
-    bkrinfo_str=$1
-    start_topic_id=$2
-    end_topic_id=$3
+
+    topic=$1
 
     batch_no=0
-    topic_id=${start_topic_id}
 
-    while [ 'x' == 'x' ]
+    while [ ! -e $tmp_file_to_stop_background_producer ]
     do
-        sleeptime=
-
-        get_random_range $sleep_min $sleep_max
-        sleeptime=$?
-
-        batch_no=$(($batch_no + 1))
-
-        if [ $topic_id -gt $end_topic_id ]; then
-            topic_id=${start_topic_id}
-        fi
+        sleeptime=$(get_random_range $producer_sleep_min $producer_sleep_max)
 
+        info "producing $num_msg_per_batch messages on topic '$topic'"
         $base_dir/bin/kafka-run-class.sh \
             kafka.perf.ProducerPerformance \
-            --brokerinfo $bkrinfo_str \
-            --topic ${topic_prefix}_${topic_id} \
+            --brokerinfo zk.connect=localhost:2181 \
+            --topic $topic \
             --messages $num_msg_per_batch \
             --message-size $message_size \
-            --batch-size 50 \
-            --vary-message-size \
-            --threads 1 \
-            --reporting-interval $num_msg_per_batch --async \
+            --threads $num_producer_threads \
+            --initial-message-id $batch_no \
             2>&1 >> $base_dir/producer_performance.log    # appending all producers' msgs
 
-        topic_id=$(( $topic_id + 1 ))
-
+        batch_no=$(($batch_no + $num_msg_per_batch))
         sleep $sleeptime
     done
 }
 
+# =========================================
+# cmp_checksum
+# =========================================
 cmp_checksum() {
 
     cmp_result=0
 
-    grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
-    grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
-    grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log
+    grep MessageID $console_consumer_source_log | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_source_mid_log
+    grep MessageID $console_consumer_target_log | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_target_mid_log
+    grep MessageID $producer_performance_log    | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $producer_performance_mid_log
 
-    sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log
-    sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log
-    sort $producer_performance_crc_log > $producer_performance_crc_sorted_log
+    sort $console_consumer_target_mid_log > $console_consumer_target_mid_sorted_log
+    sort $console_consumer_source_mid_log > $console_consumer_source_mid_sorted_log
+    sort $producer_performance_mid_log > $producer_performance_mid_sorted_log
 
-    sort -u $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_uniq_log
-    sort -u $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_uniq_log
-    sort -u $producer_performance_crc_log > $producer_performance_crc_sorted_uniq_log
+    sort -u $console_consumer_target_mid_log > $console_consumer_target_mid_sorted_uniq_log
+    sort -u $console_consumer_source_mid_log > $console_consumer_source_mid_sorted_uniq_log
+    sort -u $producer_performance_mid_log > $producer_performance_mid_sorted_uniq_log
 
-    msg_count_from_source_consumer=`cat $console_consumer_source_crc_log | wc -l | tr -d ' '`
-    uniq_msg_count_from_source_consumer=`cat $console_consumer_source_crc_sorted_uniq_log | wc -l | tr -d ' '`
+    msg_count_from_source_consumer=`cat $console_consumer_source_mid_log | wc -l | tr -d ' '`
+    uniq_msg_count_from_source_consumer=`cat $console_consumer_source_mid_sorted_uniq_log | wc -l | tr -d ' '`
 
-    msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_log | wc -l | tr -d ' '`
-    uniq_msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_sorted_uniq_log | wc -l | tr -d ' '`
+    msg_count_from_mirror_consumer=`cat $console_consumer_target_mid_log | wc -l | tr -d ' '`
+    uniq_msg_count_from_mirror_consumer=`cat $console_consumer_target_mid_sorted_uniq_log | wc -l | tr -d ' '`
 
-    uniq_msg_count_from_producer=`cat $producer_performance_crc_sorted_uniq_log | wc -l | tr -d ' '`
+    uniq_msg_count_from_producer=`cat $producer_performance_mid_sorted_uniq_log | wc -l | tr -d ' '`
 
-    total_msg_published=`cat $producer_performance_crc_log | wc -l | tr -d ' '`
+    total_msg_published=`cat $producer_performance_mid_log | wc -l | tr -d ' '`
 
     duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer ))
 
-    crc_only_in_mirror_consumer=`comm -23 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
-    crc_only_in_source_consumer=`comm -13 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
-    crc_common_in_both_consumer=`comm -12 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
+    crc_only_in_mirror_consumer=`comm -23 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log`
+    crc_only_in_source_consumer=`comm -13 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log`
+    crc_common_in_both_consumer=`comm -12 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log`
 
-    crc_only_in_producer=`comm -23 $producer_performance_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
+    crc_only_in_producer=`comm -23 $producer_performance_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log`
 
-    duplicate_mirror_crc=`comm -23 $console_consumer_mirror_crc_sorted_log $console_consumer_mirror_crc_sorted_uniq_log` 
+    duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log` 
     no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \
                           + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \
                           2*$duplicate_msg_in_producer ))
 
+    source_mirror_uniq_msg_diff=$(($uniq_msg_count_from_source_consumer - $uniq_msg_count_from_mirror_consumer))
+
     echo ""
     echo "========================================================"
     echo "no. of messages published            : $total_msg_published"
@@ -587,8 +515,7 @@ cmp_checksum() {
     echo "mirror consumer msg rec'd            : $msg_count_from_mirror_consumer"
     echo "mirror consumer unique msg rec'd     : $uniq_msg_count_from_mirror_consumer"
     echo "total source/mirror duplicate msg    : $no_of_duplicate_msg"
-    echo "source/mirror uniq msg count diff    : $(($uniq_msg_count_from_source_consumer - \
-                                                 $uniq_msg_count_from_mirror_consumer))"
+    echo "source/mirror uniq msg count diff    : $source_mirror_uniq_msg_diff"
     echo "========================================================"
     echo "(Please refer to $checksum_diff_log for more details)"
     echo ""
@@ -611,96 +538,75 @@ cmp_checksum() {
     echo "========================================================" >> $checksum_diff_log
     echo "duplicate crc in mirror consumer"                         >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
-    echo "${duplicate_mirror_crc}"                                  >> $checksum_diff_log
-
-    topic_chksum_counter=1
-    while [ $topic_chksum_counter -le $max_topic_id ]
-    do
-        # get producer topic counts
-        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log`
-        echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
-        topic_chksum_counter=$(($topic_chksum_counter + 1))
-    done
-    echo
-
-    topic_chksum_counter=1
-    while [ $topic_chksum_counter -le $max_topic_id ]
-    do
-        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log`
-        echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
-        topic_chksum_counter=$(($topic_chksum_counter + 1))
-    done
-    echo
+    echo "${duplicate_mirror_mid}"                                  >> $checksum_diff_log
 
-    topic_chksum_counter=1
-    while [ $topic_chksum_counter -le $max_topic_id ]
-    do
-        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log`
-        echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
-        topic_chksum_counter=$(($topic_chksum_counter + 1))
-    done
+    echo "================="
+    if [[ $source_mirror_uniq_msg_diff -eq 0 && $uniq_msg_count_from_source_consumer -gt 0 ]]; then
+        echo "## Test PASSED"
+    else
+        echo "## Test FAILED"
+    fi
+    echo "================="
     echo
 
     return $cmp_result
 }
 
+# =========================================
+# start_test
+# =========================================
 start_test() {
 
+    echo
+    info "==========================================================="
+    info "#### Starting Kafka Broker / Mirror Maker Failure Test ####"
+    info "==========================================================="
+    echo
+
     start_zk
     sleep 2
+
     start_source_servers_cluster
     sleep 2
+
+    create_topic $test_topic localhost:$zk_source_port 1
+    sleep 2
+
     start_target_servers_cluster
     sleep 2
 
-    start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) &
-    background_producer_pid_1=$!
+    start_target_mirror_maker
+    sleep 2
 
-    info "=========================================="
-    info "Started background producer pid [${background_producer_pid_1}]"
-    info "=========================================="
+    start_background_producer $test_topic &
+    background_producer_pid=$!
 
-    sleep 10
+    info "Started background producer pid [${background_producer_pid}]"
+    sleep 5
    
-    start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id &
-    background_producer_pid_2=$!
-
-    info "=========================================="
-    info "Started background producer pid [${background_producer_pid_2}]"
-    info "=========================================="
-
-    sleep 10
-
-    verify_consumer_rebalancing
-
-    info "abort_test: [${abort_test}]"
-    if [ "${abort_test}_x" == "true_x" ]; then
-        info "aborting test"
-        iter=$((${num_iterations} + 1))
-    fi
- 
+    # loop for no. of iterations specified in $num_iterations 
     while [ $num_iterations -ge $iter ]
     do
-        echo
-        info "=========================================="
-        info "Iteration $iter of ${num_iterations}"
-        info "=========================================="
+        # if $svr_to_bounce is '0', it means no bouncing
+        if [[ $num_iterations -ge $iter && $svr_to_bounce -gt 0 ]]; then
+            idx=
 
-        # terminate the broker if not the last iteration:
-        if [[ $num_iterations -gt $iter && $no_bouncing -eq 0 ]]; then
+            # check which type of broker bouncing is requested: source, mirror_maker or target
 
-            idx=
+            # $svr_to_bounce contains $bounce_target_id - eg. '3', '123', ... etc
+            svr_idx=`expr index $svr_to_bounce $bounce_target_id`
+            if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+                echo
+                info "=========================================="
+                info "Iteration $iter of ${num_iterations}"
+                info "=========================================="
+
+                # bounce target kafka broker
+                idx=$(get_random_range 1 $num_kafka_target_server)
 
-            if [ $(( $iter % 2 )) -eq 0 ]; then
-                # even iterations -> bounce target kafka borker
-                get_random_range 1 $num_kafka_target_server 
-                idx=$?
                 if [ "x${kafka_target_pids[$idx]}" != "x" ]; then
                     echo
-                    info "#### Bouncing kafka TARGET broker ####"
+                    info "#### Bouncing Kafka TARGET Broker ####"
 
                     info "terminating kafka target[$idx] with process id ${kafka_target_pids[$idx]}"
                     kill_child_processes 0 ${kafka_target_pids[$idx]}
@@ -709,19 +615,56 @@ start_test() {
                     sleep $wait_time_after_killing_broker
 
                     info "starting kafka target server"
-                    start_embedded_consumer_server $idx
+                    start_target_server $idx
+                fi
+                iter=$(($iter+1))
+                info "sleeping for ${wait_time_after_restarting_broker}s"
+                sleep $wait_time_after_restarting_broker
+             fi
+
+            # $svr_to_bounce contains $bounce_mir_mkr_id - eg. '2', '123', ... etc
+            svr_idx=`expr index $svr_to_bounce $bounce_mir_mkr_id`
+            if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+                echo
+                info "=========================================="
+                info "Iteration $iter of ${num_iterations}"
+                info "=========================================="
 
-                    info "sleeping for ${wait_time_after_restarting_broker}s"
-                    sleep $wait_time_after_restarting_broker
+                # bounce mirror maker
+                idx=$(get_random_range 1 $num_kafka_mirror_maker)
+
+                if [ "x${kafka_mirror_maker_pids[$idx]}" != "x" ]; then
+                    echo
+                    info "#### Bouncing Kafka Mirror Maker ####"
+
+                    info "terminating kafka mirror maker [$idx] with process id ${kafka_mirror_maker_pids[$idx]}"
+                    kill_child_processes 0 ${kafka_mirror_maker_pids[$idx]}
+
+                    info "sleeping for ${wait_time_after_killing_broker}s"
+                    sleep $wait_time_after_killing_broker
+
+                    info "starting kafka mirror maker"
+                    start_mirror_maker $idx
                 fi
-            else
-                # odd iterations -> bounce source kafka broker
-                get_random_range 1 $num_kafka_source_server 
-                idx=$?
+                iter=$(($iter+1))
+                info "sleeping for ${wait_time_after_restarting_broker}s"
+                sleep $wait_time_after_restarting_broker
+             fi
+
+            # $svr_to_bounce contains $bounce_source_id - eg. '1', '123', ... etc
+            svr_idx=`expr index $svr_to_bounce $bounce_source_id`
+            if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+                echo
+                info "=========================================="
+                info "Iteration $iter of ${num_iterations}"
+                info "=========================================="
+
+                # bounce source kafka broker
+                idx=$(get_random_range 1 $num_kafka_source_server)
 
                 if [ "x${kafka_source_pids[$idx]}" != "x" ]; then
                     echo
-                    info "#### Bouncing kafka SOURCE broker ####"
+                    info "#### Bouncing Kafka SOURCE Broker ####"
 
                     info "terminating kafka source[$idx] with process id ${kafka_source_pids[$idx]}"
                     kill_child_processes 0 ${kafka_source_pids[$idx]}
@@ -731,69 +674,142 @@ start_test() {
 
                     info "starting kafka source server"
                     start_source_server $idx
-
-                    info "sleeping for ${wait_time_after_restarting_broker}s"
-                    sleep $wait_time_after_restarting_broker
                 fi
-            fi
-
-            verify_consumer_rebalancing
-
-            info "abort_test: [${abort_test}]"
-            if [ "${abort_test}_x" == "true_x" ]; then
-                info "aborting test"
-                iter=$((${num_iterations} + 1))
-            fi
-
+                iter=$(($iter+1))
+                info "sleeping for ${wait_time_after_restarting_broker}s"
+                sleep $wait_time_after_restarting_broker
+             fi
         else
+            echo
+            info "=========================================="
+            info "Iteration $iter of ${num_iterations}"
+            info "=========================================="
+
             info "No bouncing performed"
+            iter=$(($iter+1))
+            info "sleeping for ${wait_time_after_restarting_broker}s"
+            sleep $wait_time_after_restarting_broker
         fi
-
-        info "sleeping for 10 sec"
-        sleep 10
-
-        iter=$(($iter+1))
     done
 
+    # notify background producer to stop
+    `touch $tmp_file_to_stop_background_producer`
+
     echo
     info "Tests completed. Waiting for consumers to catch up "
-    
-    shutdown_producer
 
-    wait_for_zero_consumer_lags
+    # =======================================================
+    # remove the following 'sleep 30' when KAFKA-313 is fixed
+    # =======================================================
+    info "sleeping 30 sec"
+    sleep 30
+}
+
+# =========================================
+# print_usage
+# =========================================
+print_usage() {
+    echo
+    echo "Error : invalid no. of arguments"
+    echo "Usage : $0 -n <no. of iterations> -s <servers to bounce>"
+    echo
+    echo "  num of iterations - the number of iterations that the test runs"
+    echo
+    echo "  servers to bounce - the servers to be bounced in a round-robin fashion"
+    echo "      Values of the servers:"
+    echo "        0 - no bouncing"
+    echo "        1 - source broker"
+    echo "        2 - mirror maker"
+    echo "        3 - target broker"
+    echo "      Example:"
+    echo "        * To bounce only mirror maker and target broker"
+    echo "          in turns, enter the value 23"
+    echo "        * To bounce only mirror maker, enter the value 2"
+    echo "        * To run the test without bouncing, enter 0"
+    echo
+    echo "Usage Example : $0 -n 10 -s 12"
+    echo "  (run 10 iterations and bounce source broker (1) + mirror maker (2) in turn)"
+    echo
 }
 
 
-# =====================
-# main test begins here
-# =====================
+# =========================================
+#
+#         Main test begins here
+#
+# =========================================
 
-echo
-info "============================================"
-info "#### Starting Kafka Broker Failure Test ####"
-info "============================================"
-echo
+# get command line arguments
+while getopts "hb:i:n:s:x:" opt
+do
+    case $opt in
+      b)
+        num_msg_per_batch=$OPTARG
+        ;;
+      h)
+        print_usage
+        exit
+        ;;
+      i)
+        producer_sleep_min=$OPTARG
+        ;;
+      n)
+        num_iterations=$OPTARG
+        ;;
+      s)
+        svr_to_bounce=$OPTARG
+        ;;
+      x)
+        producer_sleep_max=$OPTARG
+        ;;
+      ?)
+        print_usage
+        exit
+        ;;
+    esac
+done
 
+# initialize and cleanup
 initialize
 cleanup
 sleep 5
 
 # Ctrl-c trap. Catches INT signal
-trap "shutdown_producer; shutdown_servers; cmp_checksum; exit 0" INT
+trap "shutdown_servers; force_shutdown_consumer; force_shutdown_background_producer; cmp_checksum; exit 0" INT
 
+# starting the test
 start_test
 
-consume_source_producer_messages
-consume_mirror_producer_messages
+# starting consumer to consume data in source
+start_console_consumer $source_console_consumer_grp $zk_source_port $console_consumer_source_log DecodedMessageFormatter
 
-wait_for_zero_source_console_consumer_lags
-wait_for_zero_mirror_console_consumer_lags
+# starting consumer to consume data in target
+start_console_consumer $target_console_consumer_grp $zk_target_port $console_consumer_target_log DecodedMessageFormatter
 
-verify_consumer_rebalancing
+# wait for zero source consumer lags
+wait_for_zero_consumer_lags $source_console_consumer_grp $zk_source_port
+
+# wait for zero target consumer lags
+wait_for_zero_consumer_lags $target_console_consumer_grp $zk_target_port
+
+# =======================================================
+# remove the following 'sleep 30' when KAFKA-313 is fixed
+# =======================================================
+info "sleeping 30 sec"
+sleep 30
 
 shutdown_servers
 
 cmp_checksum
 result=$?
 
+# ===============================================
+# Report the time taken
+# ===============================================
+test_end_time="$(date +%s)"
+total_test_time_sec=$(( $test_end_time - $test_start_time ))
+total_test_time_min=$(( $total_test_time_sec / 60 ))
+info "Total time taken: $total_test_time_min min for $num_iterations iterations"
+echo
+
 exit $result

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties Tue Jul 10 18:05:54 2012
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=INFO, stdout, kafkaAppender
+log4j.rootLogger=INFO, stdout
 
 # ====================================
 # messages going to kafkaAppender
@@ -27,7 +27,7 @@ log4j.logger.org.apache.zookeeper=INFO, 
 # ====================================
 # (comment out this line to redirect ZK-related messages to kafkaAppender
 #  to allow reading both Kafka and ZK debugging messages in a single file)
-#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
+log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
 
 # ====================================
 # stdout
@@ -73,6 +73,7 @@ log4j.additivity.org.apache.zookeeper=fa
 
 log4j.logger.kafka.consumer=DEBUG
 log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
+log4j.logger.kafka.tools.ConsumerOffsetChecker=DEBUG
 
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties Tue Jul 10 18:05:54 2012
@@ -15,7 +15,8 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9081
+#broker.list=0:localhost:9081
+zk.connect=localhost:2182
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties Tue Jul 10 18:05:54 2012
@@ -15,7 +15,8 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9082
+#broker.list=0:localhost:9082
+zk.connect=localhost:2182
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties Tue Jul 10 18:05:54 2012
@@ -15,7 +15,8 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9083
+#broker.list=0:localhost:9083
+zk.connect=localhost:2182
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties Tue Jul 10 18:05:54 2012
@@ -74,8 +74,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# set sendBufferSize
-send.buffer.size=10000
-
-# set receiveBufferSize
-receive.buffer.size=10000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -74,9 +74,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -74,9 +74,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -74,9 +74,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -77,9 +77,3 @@ log.default.flush.scheduler.interval.ms=
 # topic partition count map
 # topic.partition.count.map=topic1:3, topic2:4
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -77,9 +77,3 @@ log.default.flush.scheduler.interval.ms=
 # topic partition count map
 # topic.partition.count.map=topic1:3, topic2:4
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties?rev=1359812&r1=1359811&r2=1359812&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties Tue Jul 10 18:05:54 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
 socket.receive.buffer=1048576
 
 # the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
 
 # the interval between running cleanup on the logs
 log.cleanup.interval.mins=1
@@ -77,9 +77,3 @@ log.default.flush.scheduler.interval.ms=
 # topic partition count map
 # topic.partition.count.map=topic1:3, topic2:4
 
-# set sendBufferSize
-send.buffer.size=500000
-
-# set receiveBufferSize
-receive.buffer.size=500000
-