You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/07/06 22:44:52 UTC
svn commit: r1358407 - in /incubator/kafka/trunk/system_test:
broker_failure/ broker_failure/bin/ broker_failure/config/ common/
Author: jjkoshy
Date: Fri Jul 6 20:44:51 2012
New Revision: 1358407
URL: http://svn.apache.org/viewvc?rev=1358407&view=rev
Log:
KAFKA-373 Fix trunk broker failure test to work with mirror maker; patched by John Fung; reviewed by Joel Koshy
Added:
incubator/kafka/trunk/system_test/common/
incubator/kafka/trunk/system_test/common/util.sh
Modified:
incubator/kafka/trunk/system_test/broker_failure/README
incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
Modified: incubator/kafka/trunk/system_test/broker_failure/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/README?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/README (original)
+++ incubator/kafka/trunk/system_test/broker_failure/README Fri Jul 6 20:44:51 2012
@@ -1,23 +1,34 @@
-This script performs broker failure tests with the following
-setup in a single local machine:
-
-1. A cluster of Kafka source brokers
-2. A cluster of Kafka mirror brokers with embedded consumers in
- point-to-point mode
-3. An independent ConsoleConsumer in publish/subcribe mode to
+** Please note that the following commands should be executed
+ after downloading the kafka source code to build all the
+ required binaries:
+ 1. <kafka install dir>/ $ ./sbt update
+ 2. <kafka install dir>/ $ ./sbt package
+
+ Now you are ready to follow the steps below.
+
+This script performs broker failure tests in an environment with
+Mirrored Source & Target clusters in a single machine:
+
+1. Start a cluster of Kafka source brokers
+2. Start a cluster of Kafka target brokers
+3. Start one or more Mirror Maker to create mirroring between
+ source and target clusters
+4. A producer produces batches of messages to the SOURCE brokers
+ in the background
+5. The Kafka SOURCE, TARGET brokers and Mirror Maker will be
+ terminated in a round-robin fashion and wait for the consumer
+ to catch up.
+6. Repeat step 5 as many times as specified in the script
+7. An independent ConsoleConsumer in publish/subcribe mode to
consume messages from the SOURCE brokers cluster
-4. An independent ConsoleConsumer in publish/subcribe mode to
- consume messages from the MIRROR brokers cluster
-5. A producer produces batches of messages to the SOURCE brokers
-6. One of the Kafka SOURCE or MIRROR brokers in the cluster will
- be randomly terminated and waiting for the consumer to catch up.
-7. Repeat Step 4 & 5 as many times as specified in the script
+8. An independent ConsoleConsumer in publish/subcribe mode to
+ consume messages from the TARGET brokers cluster
Expected results:
==================
There should not be any discrepancies by comparing the unique
message checksums from the source ConsoleConsumer and the
-mirror ConsoleConsumer.
+target ConsoleConsumer.
Notes:
==================
@@ -26,17 +37,36 @@ The number of Kafka SOURCE brokers can b
2. Make sure that there are corresponding number of prop files:
$base_dir/config/server_source{1..4}.properties
-The number of Kafka MIRROR brokers can be increased as follows:
+The number of Kafka TARGET brokers can be increased as follows:
1. Update the value of $num_kafka_target_server in this script
2. Make sure that there are corresponding number of prop files:
$base_dir/config/server_target{1..3}.properties
Quick Start:
==================
-Execute this script as follows:
- <kafka home>/system_test/broker_failure $ bin/run-test.sh
+In the directory <kafka home>/system_test/broker_failure,
+execute this script as following:
+ $ bin/run-test.sh -n <num of iterations> -s <servers to bounce>
+
+num of iterations - the number of iterations that the test runs
+
+servers to bounce - the servers to be bounced in a round-robin fashion.
+
+ Values to be entered:
+ 1 - source broker
+ 2 - mirror maker
+ 3 - target broker
+
+ Example:
+ * To bounce only mirror maker and target broker
+ in turns, enter the value 23.
+ * To bounce only mirror maker, enter the value 2.
+ * To run the test without bouncing, enter 0.
+
+At the end of the test, the received messages checksums in both
+SOURCE & TARGET will be compared. If all checksums are matched,
+the test is PASSED. Otherwise, the test is FAILED.
In the event of failure, by default the brokers and zookeepers
remain running to make it easier to debug the issue - hit Ctrl-C
to shut them down.
-
Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Fri Jul 6 20:44:51 2012
@@ -14,151 +14,133 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# =================================================================
+# ===========
# run-test.sh
# ===========
-#
-# This script performs broker failure tests with the following
-# setup in a single local machine:
-#
-# 1. A cluster of Kafka source brokers
-# 2. A cluster of Kafka mirror brokers with embedded consumers in
-# point-to-point mode
-# 3. An independent ConsoleConsumer in publish/subcribe mode to
-# consume messages from the SOURCE brokers cluster
-# 4. An independent ConsoleConsumer in publish/subcribe mode to
-# consume messages from the MIRROR brokers cluster
-# 5. A producer produces batches of messages to the SOURCE brokers
-# 6. One of the Kafka SOURCE or MIRROR brokers in the cluster will
-# be randomly terminated and waiting for the consumer to catch up.
-# 7. Repeat Step 4 & 5 as many times as specified in the script
-#
-# Expected results:
-# ==================
-# There should not be any discrepancies by comparing the unique
-# message checksums from the source ConsoleConsumer and the
-# mirror ConsoleConsumer.
-#
-# Notes:
-# ==================
-# The number of Kafka SOURCE brokers can be increased as follows:
-# 1. Update the value of $num_kafka_source_server in this script
-# 2. Make sure that there are corresponding number of prop files:
-# $base_dir/config/server_source{1..4}.properties
-#
-# The number of Kafka TARGET brokers can be increased as follows:
-# 1. Update the value of $num_kafka_target_server in this script
-# 2. Make sure that there are corresponding number of prop files:
-# $base_dir/config/server_target{1..3}.properties
-#
-# Quick Start:
-# ==================
-# Execute this script as follows:
-# <kafka home>/system_test/broker_failure $ bin/run-test.sh
-#
-# The expected output is given in bin/expected.out.
-#
-# In the event of failure, by default the brokers and zookeepers
-# remain running to make it easier to debug the issue - hit Ctrl-C
-# to shut them down.
-# =================================================================
-
-readonly base_dir=$(dirname $0)/..
-readonly test_start_time="$(date +%s)"
-
-readonly num_msg_per_batch=500
-readonly batches_per_iteration=5
-readonly num_iterations=1
-
-readonly zk_source_port=2181
-readonly zk_mirror_port=2182
-
-readonly topic_prefix=test
-readonly max_topic_id=2
-readonly unbalanced_start_id=2
-readonly consumer_grp=group1
+
+# ====================================
+# Do not change the followings
+# (keep this section at the beginning
+# of this script)
+# ====================================
+readonly system_test_root=$(dirname $0)/../.. # path of <kafka install>/system_test
+readonly common_dir=${system_test_root}/common # common util scripts for system_test
+source ${common_dir}/util.sh # include the util script
+
+readonly base_dir=$(dirname $0)/.. # the base dir of this test suite
+readonly test_start_time="$(date +%s)" # time starting this test
+readonly bounce_source_id=1
+readonly bounce_mir_mkr_id=2
+readonly bounce_target_id=3
+readonly log4j_prop_file=$base_dir/config/log4j.properties
+
+iter=1 # init a counter to keep track of iterations
+num_iterations=5 # total no. of iterations to run
+svr_to_bounce=0 # servers to bounce: 1-source 2-mirror_maker 3-target
+ # 12 - source & mirror_maker
+ # 13 - source & target
+
+# ====================================
+# No need to change the following
+# configurations in most cases
+# ====================================
+readonly zk_source_port=2181 # source zk port
+readonly zk_target_port=2182 # target zk port
+readonly test_topic=test01 # topic used in this test
+readonly consumer_grp=group1 # consumer group
readonly source_console_consumer_grp=source
-readonly mirror_console_consumer_grp=mirror
+readonly target_console_consumer_grp=target
readonly message_size=5000
-
-# sleep time between each batch of messages published
-# from producer - it will be randomly generated
-# within the range of sleep_min & sleep_max
-readonly sleep_min=3
-readonly sleep_max=3
-
-# requires same no. of property files such as:
-# $base_dir/config/server_source{1..4}.properties
-readonly num_kafka_source_server=4
-
-# requires same no. of property files such as:
-# $base_dir/config/server_target{1..3}.properties
-readonly num_kafka_target_server=3
-
-readonly wait_time_after_killing_broker=0
-readonly wait_time_after_restarting_broker=5
-
-readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094"
-readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093"
-
-background_producer_pid_1=
-background_producer_pid_2=
-
-no_bouncing=$#
-
-iter=1
-abort_test=false
-
+readonly console_consumer_timeout_ms=15000
+readonly num_kafka_source_server=4 # requires same no. of property files such as:
+ # $base_dir/config/server_source{1..4}.properties
+readonly num_kafka_target_server=3 # requires same no. of property files such as:
+ # $base_dir/config/server_target{1..3}.properties
+readonly num_kafka_mirror_maker=3 # any values greater than 0
+readonly wait_time_after_killing_broker=0 # wait after broker is stopped but before starting again
+readonly wait_time_after_restarting_broker=10
+
+# ====================================
+# Change the followings as needed
+# ====================================
+num_msg_per_batch=500 # no. of msg produced in each calling of ProducerPerformance
+producer_sleep_min=5 # min & max sleep time (in sec) between each
+producer_sleep_max=5 # batch of messages sent from producer
+
+# ====================================
+# zookeeper
+# ====================================
pid_zk_source=
pid_zk_target=
+zk_log4j_log=
+# ====================================
+# kafka source
+# ====================================
kafka_source_pids=
kafka_source_prop_files=
kafka_source_log_files=
+kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log
+kafka_log4j_log=
+# ====================================
+# kafka target
+# ====================================
kafka_target_pids=
kafka_target_prop_files=
kafka_target_log_files=
+
+# ====================================
+# mirror maker
+# ====================================
+kafka_mirror_maker_pids=
+kafka_mirror_maker_log_files=
+consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
mirror_producer_prop_files=
+# ====================================
+# console consumer source
+# ====================================
console_consumer_source_pid=
-console_consumer_mirror_pid=
-
console_consumer_source_log=$base_dir/console_consumer_source.log
-console_consumer_mirror_log=$base_dir/console_consumer_mirror.log
-producer_performance_log=$base_dir/producer_performance.log
-
console_consumer_source_crc_log=$base_dir/console_consumer_source_crc.log
console_consumer_source_crc_sorted_log=$base_dir/console_consumer_source_crc_sorted.log
console_consumer_source_crc_sorted_uniq_log=$base_dir/console_consumer_source_crc_sorted_uniq.log
-console_consumer_mirror_crc_log=$base_dir/console_consumer_mirror_crc.log
-console_consumer_mirror_crc_sorted_log=$base_dir/console_consumer_mirror_crc_sorted.log
-console_consumer_mirror_crc_sorted_uniq_log=$base_dir/console_consumer_mirror_crc_sorted_uniq.log
-
+# ====================================
+# console consumer target
+# ====================================
+console_consumer_target_pid=
+console_consumer_target_log=$base_dir/console_consumer_target.log
+console_consumer_target_crc_log=$base_dir/console_consumer_target_crc.log
+console_consumer_target_crc_sorted_log=$base_dir/console_consumer_target_crc_sorted.log
+console_consumer_target_crc_sorted_uniq_log=$base_dir/console_consumer_target_crc_sorted_uniq.log
+
+# ====================================
+# producer
+# ====================================
+background_producer_pid=
+producer_performance_log=$base_dir/producer_performance.log
producer_performance_crc_log=$base_dir/producer_performance_crc.log
producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log
producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log
+tmp_file_to_stop_background_producer=/tmp/tmp_file_to_stop_background_producer
-consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log
-
-consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
+# ====================================
+# test reports
+# ====================================
checksum_diff_log=$base_dir/checksum_diff.log
-info() {
- echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
-
-info_no_newline() {
- echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
+# ====================================
+# initialize prop and log files
+# ====================================
initialize() {
for ((i=1; i<=$num_kafka_target_server; i++))
do
kafka_target_prop_files[${i}]=$base_dir/config/server_target${i}.properties
kafka_target_log_files[${i}]=$base_dir/kafka_target${i}.log
- mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties
+ kafka_mirror_maker_log_files[${i}]=$base_dir/kafka_mirror_maker${i}.log
done
for ((i=1; i<=$num_kafka_source_server; i++))
@@ -166,109 +148,69 @@ initialize() {
kafka_source_prop_files[${i}]=$base_dir/config/server_source${i}.properties
kafka_source_log_files[${i}]=$base_dir/kafka_source${i}.log
done
-}
-# =========================================
-# get_random_range - return a random number
-# between the lower & upper bounds
-# usage:
-# get_random_range $lower $upper
-# random_no=$?
-# =========================================
-get_random_range() {
- lo=$1
- up=$2
- range=$(($up - $lo + 1))
-
- return $(($(($RANDOM % range)) + $lo))
-}
-
-verify_consumer_rebalancing() {
-
- info "Verifying consumer rebalancing operation"
-
- CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \
- kafka.tools.VerifyConsumerRebalance \
- --zk.connect=localhost:2181 \
- --group $consumer_grp`
- echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log
-
- REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1`
- # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE"
- REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"`
- info "REBALANCE_STATUS: $REBALANCE_STATUS"
-
- if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then
- info "setting abort_test to true due to Rebalance operation failed"
- abort_test="true"
- fi
+ for ((i=1; i<=$num_kafka_mirror_maker; i++))
+ do
+ mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties
+ done
+
+ zk_log4j_log=`grep "log4j.appender.zookeeperAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'`
+ kafka_log4j_log=`grep "log4j.appender.kafkaAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'`
}
-wait_for_zero_consumer_lags() {
+# =========================================
+# cleanup
+# =========================================
+cleanup() {
+ info "cleaning up"
- topic_id=$1
+ rm -rf $tmp_file_to_stop_background_producer
+ rm -rf $kafka_topic_creation_log_file
- # no of times to check for zero lagging
- no_of_zero_to_verify=3
+ rm -rf /tmp/zookeeper_source
+ rm -rf /tmp/zookeeper_target
- while [ 'x' == 'x' ]
- do
- TOTAL_LAG=0
- CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
- --group $consumer_grp --zkconnect localhost:$zk_source_port \
- --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ rm -rf /tmp/kafka-source{1..4}-logs
+ rm -rf /tmp/kafka-target{1..3}-logs
- for lag in $CONSUMER_LAGS;
- do
- TOTAL_LAG=$(($TOTAL_LAG + $lag))
- done
+ rm -rf $zk_log4j_log
+ rm -rf $kafka_log4j_log
- info "mirror TOTAL_LAG = $TOTAL_LAG"
- if [ $TOTAL_LAG -eq 0 ]; then
- if [ $no_of_zero_to_verify -eq 0 ]; then
- echo
- return 0
- fi
- no_of_zero_to_verify=$(($no_of_zero_to_verify - 1))
- fi
- sleep 1
+ for ((i=1; i<=$num_kafka_target_server; i++))
+ do
+ rm -rf ${kafka_target_log_files[${i}]}
+ rm -rf ${kafka_mirror_maker_log_files[${i}]}
done
-}
-
-wait_for_zero_source_console_consumer_lags() {
- topic_id=$1
+ rm -f $base_dir/zookeeper_source.log
+ rm -f $base_dir/zookeeper_target.log
+ rm -f $base_dir/kafka_source{1..4}.log
- # no of times to check for zero lagging
- no_of_zero_to_verify=3
+ rm -f $producer_performance_log
+ rm -f $producer_performance_crc_log
+ rm -f $producer_performance_crc_sorted_log
+ rm -f $producer_performance_crc_sorted_uniq_log
- while [ 'x' == 'x' ]
- do
- TOTAL_LAG=0
- CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
- --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \
- --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ rm -f $console_consumer_target_log
+ rm -f $console_consumer_source_log
+ rm -f $console_consumer_target_crc_log
+ rm -f $console_consumer_source_crc_log
- for lag in $CONSUMER_LAGS;
- do
- TOTAL_LAG=$(($TOTAL_LAG + $lag))
- done
+ rm -f $checksum_diff_log
- info "source console consumer TOTAL_LAG = $TOTAL_LAG"
- if [ $TOTAL_LAG -eq 0 ]; then
- if [ $no_of_zero_to_verify -eq 0 ]; then
- echo
- return 0
- fi
- no_of_zero_to_verify=$(($no_of_zero_to_verify - 1))
- fi
- sleep 1
- done
+ rm -f $console_consumer_target_crc_sorted_log
+ rm -f $console_consumer_source_crc_sorted_log
+ rm -f $console_consumer_target_crc_sorted_uniq_log
+ rm -f $console_consumer_source_crc_sorted_uniq_log
}
-wait_for_zero_mirror_console_consumer_lags() {
+# =========================================
+# wait_for_zero_consumer_lags
+# =========================================
+wait_for_zero_consumer_lags() {
- topic_id=$1
+ this_group_name=$1
+ this_zk_port=$2
# no of times to check for zero lagging
no_of_zero_to_verify=3
@@ -277,8 +219,10 @@ wait_for_zero_mirror_console_consumer_la
do
TOTAL_LAG=0
CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
- --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \
- --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ --group $target_console_consumer_grp \
+ --zkconnect localhost:$zk_target_port \
+ --topic $test_topic \
+ | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
for lag in $CONSUMER_LAGS;
do
@@ -297,58 +241,25 @@ wait_for_zero_mirror_console_consumer_la
done
}
-kill_child_processes() {
- isTopmost=$1
- curPid=$2
- childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
-
- for childPid in $childPids
- do
- kill_child_processes 0 $childPid
- done
- if [ $isTopmost -eq 0 ]; then
- kill -15 $curPid 2> /dev/null
- fi
-}
-
-cleanup() {
- info "cleaning up"
-
- rm -rf /tmp/zookeeper_source
- rm -rf /tmp/zookeeper_target
-
- rm -rf /tmp/kafka-source{1..4}-logs
- rm -rf /tmp/kafka-target{1..3}-logs
-
- for ((i=1; i<=$num_kafka_target_server; i++))
- do
- rm -rf ${kafka_target_log_files[${i}]}
- done
-
- rm -f $base_dir/zookeeper_source.log
- rm -f $base_dir/zookeeper_target.log
- rm -f $base_dir/kafka_source{1..4}.log
-
- rm -f $producer_performance_log
- rm -f $producer_performance_crc_log
- rm -f $producer_performance_crc_sorted_log
- rm -f $producer_performance_crc_sorted_uniq_log
-
- rm -f $console_consumer_mirror_log
- rm -f $console_consumer_source_log
- rm -f $console_consumer_mirror_crc_log
- rm -f $console_consumer_source_crc_log
-
- rm -f $checksum_diff_log
-
- rm -f $console_consumer_mirror_crc_sorted_log
- rm -f $console_consumer_source_crc_sorted_log
- rm -f $console_consumer_mirror_crc_sorted_uniq_log
- rm -f $console_consumer_source_crc_sorted_uniq_log
-
- rm -f $consumer_rebalancing_log
+# =========================================
+# create_topic
+# =========================================
+create_topic() {
+ this_topic_to_create=$1
+ this_zk_conn_str=$2
+ this_replica_factor=$3
+
+ info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]"
+ $base_dir/../../bin/kafka-create-topic.sh \
+ --topic $this_topic_to_create \
+ --zookeeper $this_zk_conn_str \
+ --replica $this_replica_factor \
+ 2> $kafka_topic_creation_log_file
}
+# =========================================
+# start_zk
+# =========================================
start_zk() {
info "starting zookeepers"
@@ -363,6 +274,9 @@ start_zk() {
pid_zk_target=$!
}
+# =========================================
+# start_source_servers_cluster
+# =========================================
start_source_servers_cluster() {
info "starting source cluster"
@@ -372,112 +286,129 @@ start_source_servers_cluster() {
done
}
+# =========================================
+# start_source_server
+# =========================================
start_source_server() {
s_idx=$1
$base_dir/bin/kafka-run-class.sh kafka.Kafka \
${kafka_source_prop_files[$s_idx]} \
- 2>&1 >> ${kafka_source_log_files[$s_idx]} & # append log msg after restarting
+ 2>&1 >> ${kafka_source_log_files[$s_idx]} &
kafka_source_pids[${s_idx}]=$!
info " -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}"
}
+# =========================================
+# start_target_servers_cluster
+# =========================================
start_target_servers_cluster() {
info "starting mirror cluster"
for ((i=1; i<=$num_kafka_target_server; i++))
do
- start_embedded_consumer_server $i
+ start_target_server $i
done
}
-start_embedded_consumer_server() {
+# =========================================
+# start_target_server
+# =========================================
+start_target_server() {
s_idx=$1
$base_dir/bin/kafka-run-class.sh kafka.Kafka \
${kafka_target_prop_files[${s_idx}]} \
- $consumer_prop_file \
- ${mirror_producer_prop_files[${s_idx}]} \
- 2>&1 >> ${kafka_target_log_files[${s_idx}]} & # append log msg after restarting
+ 2>&1 >> ${kafka_target_log_files[${s_idx}]} &
kafka_target_pids[$s_idx]=$!
info " -> kafka_target_pids[$s_idx]: ${kafka_target_pids[$s_idx]}"
}
-start_console_consumer_for_source_producer() {
+# =========================================
+# start_target_mirror_maker
+# =========================================
+start_target_mirror_maker() {
+ info "starting mirror maker"
- topic_id=$1
+ for ((i=1; i<=$num_kafka_mirror_maker; i++))
+ do
+ start_mirror_maker $i
+ done
+}
- info "starting console consumers for source producer on topic id [$topic_id]"
+# =========================================
+# start_mirror_maker
+# =========================================
+start_mirror_maker() {
+ s_idx=$1
- $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
- --zookeeper localhost:$zk_source_port \
- --topic ${topic_prefix}_${topic_id} \
- --group $source_console_consumer_grp \
- --from-beginning --consumer-timeout-ms 5000 \
- --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
- --property topic=${topic_prefix}_${topic_id} \
- 2>&1 >> ${console_consumer_source_log}
+ $base_dir/bin/kafka-run-class.sh kafka.tools.MirrorMaker \
+ --consumer.config $consumer_prop_file \
+ --producer.config ${mirror_producer_prop_files[${s_idx}]} \
+ --whitelist=\".*\" \
+ 2>&1 >> ${kafka_mirror_maker_log_files[$s_idx]} &
+ kafka_mirror_maker_pids[${s_idx}]=$!
+
+ info " -> kafka_mirror_maker_pids[$s_idx]: ${kafka_mirror_maker_pids[$s_idx]}"
}
-start_console_consumer_for_mirror_producer() {
+# =========================================
+# start_console_consumer
+# =========================================
+start_console_consumer() {
- topic_id=$1
+ this_consumer_grp=$1
+ this_consumer_zk_port=$2
+ this_consumer_log=$3
- info "starting console consumers for mirroring producer on topic id [$topic_id]"
+ info "starting console consumers for $this_consumer_grp"
$base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
- --zookeeper localhost:$zk_mirror_port \
- --topic ${topic_prefix}_${topic_id} \
- --group $mirror_console_consumer_grp \
- --from-beginning --consumer-timeout-ms 5000 \
+ --zookeeper localhost:$this_consumer_zk_port \
+ --topic $test_topic \
+ --group $this_consumer_grp \
+ --from-beginning \
+ --consumer-timeout-ms $console_consumer_timeout_ms \
--formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
- --property topic=${topic_prefix}_${topic_id} \
- 2>&1 >> ${console_consumer_mirror_log}
-}
+ 2>&1 > ${this_consumer_log} &
+ console_consumer_pid=$!
-consume_source_producer_messages() {
- consumer_counter=1
- while [ $consumer_counter -le $max_topic_id ]
- do
- start_console_consumer_for_source_producer $consumer_counter
- consumer_counter=$(( $consumer_counter + 1 ))
- done
+ info " -> console consumer pid: $console_consumer_pid"
}
-consume_mirror_producer_messages() {
- consumer_counter=1
- while [ $consumer_counter -le $max_topic_id ]
- do
- start_console_consumer_for_mirror_producer $consumer_counter
- consumer_counter=$(( $consumer_counter + 1 ))
- done
+# =========================================
+# force_shutdown_background_producer
+# - to be called when user press Ctrl-C
+# =========================================
+force_shutdown_background_producer() {
+ info "force shutting down producer"
+ `ps auxw | grep "run\-test\|ProducerPerformance" | grep -v grep | awk '{print $2}' | xargs kill -9`
}
-shutdown_producer() {
- info "shutting down producer"
- if [ "x${background_producer_pid_1}" != "x" ]; then
- # kill_child_processes 0 ${background_producer_pid_1};
- kill -TERM ${background_producer_pid_1} 2> /dev/null;
- fi
-
- if [ "x${background_producer_pid_2}" != "x" ]; then
- # kill_child_processes 0 ${background_producer_pid_2};
- kill -TERM ${background_producer_pid_2} 2> /dev/null;
- fi
+# =========================================
+# force_shutdown_consumer
+# - to be called when user press Ctrl-C
+# =========================================
+force_shutdown_consumer() {
+ info "force shutting down consumer"
+ `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9`
}
+# =========================================
+# shutdown_servers
+# =========================================
shutdown_servers() {
- info "shutting down mirror console consumer"
- if [ "x${console_consumer_mirror_pid}" != "x" ]; then
- kill_child_processes 0 ${console_consumer_mirror_pid};
- fi
- info "shutting down source console consumer"
- if [ "x${console_consumer_source_pid}" != "x" ]; then
- kill_child_processes 0 ${console_consumer_source_pid};
- fi
+ info "shutting down mirror makers"
+ for ((i=1; i<=$num_kafka_mirror_maker; i++))
+ do
+ #info "stopping mm pid: ${kafka_mirror_maker_pids[$i]}"
+ if [ "x${kafka_mirror_maker_pids[$i]}" != "x" ]; then
+ kill_child_processes 0 ${kafka_mirror_maker_pids[$i]};
+ fi
+ done
info "shutting down target servers"
for ((i=1; i<=$num_kafka_target_server; i++))
@@ -500,66 +431,66 @@ shutdown_servers() {
if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
}
+# =========================================
+# start_background_producer
+# =========================================
start_background_producer() {
- bkrinfo_str=$1
- start_topic_id=$2
- end_topic_id=$3
+
+ topic=$1
batch_no=0
- topic_id=${start_topic_id}
- while [ 'x' == 'x' ]
+ while [ ! -e $tmp_file_to_stop_background_producer ]
do
sleeptime=
- get_random_range $sleep_min $sleep_max
+ get_random_range $producer_sleep_min $producer_sleep_max
sleeptime=$?
batch_no=$(($batch_no + 1))
- if [ $topic_id -gt $end_topic_id ]; then
- topic_id=${start_topic_id}
- fi
-
+ info "producing $num_msg_per_batch messages on topic '$topic'"
$base_dir/bin/kafka-run-class.sh \
kafka.perf.ProducerPerformance \
- --brokerinfo $bkrinfo_str \
- --topic ${topic_prefix}_${topic_id} \
+ --brokerinfo zk.connect=localhost:2181 \
+ --topic $topic \
--messages $num_msg_per_batch \
--message-size $message_size \
--batch-size 50 \
--vary-message-size \
--threads 1 \
- --reporting-interval $num_msg_per_batch --async \
+ --reporting-interval $num_msg_per_batch \
+ --async \
2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs
- topic_id=$(( $topic_id + 1 ))
-
sleep $sleeptime
done
}
+# =========================================
+# cmp_checksum
+# =========================================
cmp_checksum() {
cmp_result=0
- grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
- grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
- grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log
+ grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
+ grep ^checksum $console_consumer_target_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_target_crc_log
+ grep checksum $producer_performance_log | tr ' ' '\n' | grep checksum | awk -F ':' '{print $2}' > $producer_performance_crc_log
- sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log
+ sort $console_consumer_target_crc_log > $console_consumer_target_crc_sorted_log
sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log
sort $producer_performance_crc_log > $producer_performance_crc_sorted_log
- sort -u $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_uniq_log
+ sort -u $console_consumer_target_crc_log > $console_consumer_target_crc_sorted_uniq_log
sort -u $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_uniq_log
sort -u $producer_performance_crc_log > $producer_performance_crc_sorted_uniq_log
msg_count_from_source_consumer=`cat $console_consumer_source_crc_log | wc -l | tr -d ' '`
uniq_msg_count_from_source_consumer=`cat $console_consumer_source_crc_sorted_uniq_log | wc -l | tr -d ' '`
- msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_log | wc -l | tr -d ' '`
- uniq_msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_sorted_uniq_log | wc -l | tr -d ' '`
+ msg_count_from_mirror_consumer=`cat $console_consumer_target_crc_log | wc -l | tr -d ' '`
+ uniq_msg_count_from_mirror_consumer=`cat $console_consumer_target_crc_sorted_uniq_log | wc -l | tr -d ' '`
uniq_msg_count_from_producer=`cat $producer_performance_crc_sorted_uniq_log | wc -l | tr -d ' '`
@@ -567,17 +498,19 @@ cmp_checksum() {
duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer ))
- crc_only_in_mirror_consumer=`comm -23 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
- crc_only_in_source_consumer=`comm -13 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
- crc_common_in_both_consumer=`comm -12 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
+ crc_only_in_mirror_consumer=`comm -23 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
+ crc_only_in_source_consumer=`comm -13 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
+ crc_common_in_both_consumer=`comm -12 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
crc_only_in_producer=`comm -23 $producer_performance_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
- duplicate_mirror_crc=`comm -23 $console_consumer_mirror_crc_sorted_log $console_consumer_mirror_crc_sorted_uniq_log`
+ duplicate_mirror_crc=`comm -23 $console_consumer_target_crc_sorted_log $console_consumer_target_crc_sorted_uniq_log`
no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \
+ $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \
2*$duplicate_msg_in_producer ))
+ source_mirror_uniq_msg_diff=$(($uniq_msg_count_from_source_consumer - $uniq_msg_count_from_mirror_consumer))
+
echo ""
echo "========================================================"
echo "no. of messages published : $total_msg_published"
@@ -587,8 +520,7 @@ cmp_checksum() {
echo "mirror consumer msg rec'd : $msg_count_from_mirror_consumer"
echo "mirror consumer unique msg rec'd : $uniq_msg_count_from_mirror_consumer"
echo "total source/mirror duplicate msg : $no_of_duplicate_msg"
- echo "source/mirror uniq msg count diff : $(($uniq_msg_count_from_source_consumer - \
- $uniq_msg_count_from_mirror_consumer))"
+ echo "source/mirror uniq msg count diff : $source_mirror_uniq_msg_diff"
echo "========================================================"
echo "(Please refer to $checksum_diff_log for more details)"
echo ""
@@ -613,94 +545,74 @@ cmp_checksum() {
echo "========================================================" >> $checksum_diff_log
echo "${duplicate_mirror_crc}" >> $checksum_diff_log
- topic_chksum_counter=1
- while [ $topic_chksum_counter -le $max_topic_id ]
- do
- # get producer topic counts
- this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log`
- echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
- topic_chksum_counter=$(($topic_chksum_counter + 1))
- done
- echo
-
- topic_chksum_counter=1
- while [ $topic_chksum_counter -le $max_topic_id ]
- do
- this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log`
- echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
- topic_chksum_counter=$(($topic_chksum_counter + 1))
- done
- echo
-
- topic_chksum_counter=1
- while [ $topic_chksum_counter -le $max_topic_id ]
- do
- this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log`
- echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
-
- topic_chksum_counter=$(($topic_chksum_counter + 1))
- done
+ echo "================="
+ if [[ $source_mirror_uniq_msg_diff -eq 0 && $uniq_msg_count_from_source_consumer -gt 0 ]]; then
+ echo "## Test PASSED"
+ else
+ echo "## Test FAILED"
+ fi
+ echo "================="
echo
return $cmp_result
}
+# =========================================
+# start_test
+# =========================================
start_test() {
+ echo
+ info "==========================================================="
+ info "#### Starting Kafka Broker / Mirror Maker Failure Test #### (v1.0)"
+ info "==========================================================="
+ echo
+
start_zk
sleep 2
+
start_source_servers_cluster
sleep 2
+
+# create_topic $test_topic localhost:$zk_source_port 1
+# sleep 2
+
start_target_servers_cluster
sleep 2
- start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) &
- background_producer_pid_1=$!
+ start_target_mirror_maker
+ sleep 2
- info "=========================================="
- info "Started background producer pid [${background_producer_pid_1}]"
- info "=========================================="
+ start_background_producer $test_topic &
+ background_producer_pid=$!
- sleep 10
+ info "Started background producer pid [${background_producer_pid}]"
+ sleep 5
- start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id &
- background_producer_pid_2=$!
-
- info "=========================================="
- info "Started background producer pid [${background_producer_pid_2}]"
- info "=========================================="
-
- sleep 10
-
- verify_consumer_rebalancing
-
- info "abort_test: [${abort_test}]"
- if [ "${abort_test}_x" == "true_x" ]; then
- info "aborting test"
- iter=$((${num_iterations} + 1))
- fi
-
+ # loop for no. of iterations specified in $num_iterations
while [ $num_iterations -ge $iter ]
do
- echo
- info "=========================================="
- info "Iteration $iter of ${num_iterations}"
- info "=========================================="
+ # if $svr_to_bounce is '0', it means no bouncing
+ if [[ $num_iterations -ge $iter && $svr_to_bounce -gt 0 ]]; then
+ idx=
- # terminate the broker if not the last iteration:
- if [[ $num_iterations -gt $iter && $no_bouncing -eq 0 ]]; then
+ # check which type of broker bouncing is requested: source, mirror_maker or target
- idx=
+ # $svr_to_bounce contains $bounce_target_id - eg. '3', '123', ... etc
+ svr_idx=`expr index $svr_to_bounce $bounce_target_id`
+ if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+ echo
+ info "=========================================="
+ info "Iteration $iter of ${num_iterations}"
+ info "=========================================="
- if [ $(( $iter % 2 )) -eq 0 ]; then
- # even iterations -> bounce target kafka borker
+ # bounce target kafka broker
get_random_range 1 $num_kafka_target_server
idx=$?
+
if [ "x${kafka_target_pids[$idx]}" != "x" ]; then
echo
- info "#### Bouncing kafka TARGET broker ####"
+ info "#### Bouncing Kafka TARGET Broker ####"
info "terminating kafka target[$idx] with process id ${kafka_target_pids[$idx]}"
kill_child_processes 0 ${kafka_target_pids[$idx]}
@@ -709,19 +621,58 @@ start_test() {
sleep $wait_time_after_killing_broker
info "starting kafka target server"
- start_embedded_consumer_server $idx
+ start_target_server $idx
+ fi
+ iter=$(($iter+1))
+ info "sleeping for ${wait_time_after_restarting_broker}s"
+ sleep $wait_time_after_restarting_broker
+ fi
+
+ # $svr_to_bounce contains $bounce_mir_mkr_id - eg. '2', '123', ... etc
+ svr_idx=`expr index $svr_to_bounce $bounce_mir_mkr_id`
+ if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+ echo
+ info "=========================================="
+ info "Iteration $iter of ${num_iterations}"
+ info "=========================================="
+
+ # bounce mirror maker
+ get_random_range 1 $num_kafka_mirror_maker
+ idx=$?
+
+ if [ "x${kafka_mirror_maker_pids[$idx]}" != "x" ]; then
+ echo
+ info "#### Bouncing Kafka Mirror Maker ####"
- info "sleeping for ${wait_time_after_restarting_broker}s"
- sleep $wait_time_after_restarting_broker
+ info "terminating kafka mirror maker [$idx] with process id ${kafka_mirror_maker_pids[$idx]}"
+ kill_child_processes 0 ${kafka_mirror_maker_pids[$idx]}
+
+ info "sleeping for ${wait_time_after_killing_broker}s"
+ sleep $wait_time_after_killing_broker
+
+ info "starting kafka mirror maker"
+ start_mirror_maker $idx
fi
- else
- # odd iterations -> bounce source kafka broker
+ iter=$(($iter+1))
+ info "sleeping for ${wait_time_after_restarting_broker}s"
+ sleep $wait_time_after_restarting_broker
+ fi
+
+ # $svr_to_bounce contains $bounce_source_id - eg. '1', '123', ... etc
+ svr_idx=`expr index $svr_to_bounce $bounce_source_id`
+ if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then
+ echo
+ info "=========================================="
+ info "Iteration $iter of ${num_iterations}"
+ info "=========================================="
+
+ # bounce source kafka broker
get_random_range 1 $num_kafka_source_server
idx=$?
if [ "x${kafka_source_pids[$idx]}" != "x" ]; then
echo
- info "#### Bouncing kafka SOURCE broker ####"
+ info "#### Bouncing Kafka SOURCE Broker ####"
info "terminating kafka source[$idx] with process id ${kafka_source_pids[$idx]}"
kill_child_processes 0 ${kafka_source_pids[$idx]}
@@ -731,69 +682,142 @@ start_test() {
info "starting kafka source server"
start_source_server $idx
-
- info "sleeping for ${wait_time_after_restarting_broker}s"
- sleep $wait_time_after_restarting_broker
fi
- fi
-
- verify_consumer_rebalancing
-
- info "abort_test: [${abort_test}]"
- if [ "${abort_test}_x" == "true_x" ]; then
- info "aborting test"
- iter=$((${num_iterations} + 1))
- fi
-
+ iter=$(($iter+1))
+ info "sleeping for ${wait_time_after_restarting_broker}s"
+ sleep $wait_time_after_restarting_broker
+ fi
else
+ echo
+ info "=========================================="
+ info "Iteration $iter of ${num_iterations}"
+ info "=========================================="
+
info "No bouncing performed"
+ iter=$(($iter+1))
+ info "sleeping for ${wait_time_after_restarting_broker}s"
+ sleep $wait_time_after_restarting_broker
fi
-
- info "sleeping for 10 sec"
- sleep 10
-
- iter=$(($iter+1))
done
+ # notify background producer to stop
+ `touch $tmp_file_to_stop_background_producer`
+
echo
info "Tests completed. Waiting for consumers to catch up "
-
- shutdown_producer
- wait_for_zero_consumer_lags
+ # =======================================================
+ # remove the following 'sleep 30' when KAFKA-313 is fixed
+ # =======================================================
+ info "sleeping 30 sec"
+ sleep 30
}
+# =========================================
+# print_usage
+# =========================================
+print_usage() {
+ echo
+ echo "Error : invalid no. of arguments"
+ echo "Usage : $0 -n <no. of iterations> -s <servers to bounce>"
+ echo
+ echo " num of iterations - the number of iterations that the test runs"
+ echo
+ echo " servers to bounce - the servers to be bounced in a round-robin fashion"
+ echo " Values of the servers:"
+ echo " 0 - no bouncing"
+ echo " 1 - source broker"
+ echo " 2 - mirror maker"
+ echo " 3 - target broker"
+ echo " Example:"
+ echo " * To bounce only mirror maker and target broker"
+ echo " in turns, enter the value 23"
+ echo " * To bounce only mirror maker, enter the value 2"
+ echo " * To run the test without bouncing, enter 0"
+ echo
+ echo "Usage Example : $0 -n 10 -s 12"
+ echo " (run 10 iterations and bounce source broker (1) + mirror maker (2) in turn)"
+ echo
+}
-# =====================
-# main test begins here
-# =====================
-echo
-info "============================================"
-info "#### Starting Kafka Broker Failure Test ####"
-info "============================================"
-echo
+# =========================================
+#
+# Main test begins here
+#
+# =========================================
+# get command line arguments
+while getopts "hb:i:n:s:x:" opt
+do
+ case $opt in
+ b)
+ num_msg_per_batch=$OPTARG
+ ;;
+ h)
+ print_usage
+ exit
+ ;;
+ i)
+ producer_sleep_min=$OPTARG
+ ;;
+ n)
+ num_iterations=$OPTARG
+ ;;
+ s)
+ svr_to_bounce=$OPTARG
+ ;;
+ x)
+ producer_sleep_max=$OPTARG
+ ;;
+ ?)
+ print_usage
+ exit
+ ;;
+ esac
+done
+
+# initialize and cleanup
initialize
cleanup
sleep 5
# Ctrl-c trap. Catches INT signal
-trap "shutdown_producer; shutdown_servers; cmp_checksum; exit 0" INT
+trap "shutdown_servers; force_shutdown_consumer; force_shutdown_background_producer; cmp_checksum; exit 0" INT
+# starting the test
start_test
-consume_source_producer_messages
-consume_mirror_producer_messages
+# starting consumer to consume data in source
+start_console_consumer $source_console_consumer_grp $zk_source_port $console_consumer_source_log
+
+# starting consumer to consume data in target
+start_console_consumer $target_console_consumer_grp $zk_target_port $console_consumer_target_log
-wait_for_zero_source_console_consumer_lags
-wait_for_zero_mirror_console_consumer_lags
+# wait for zero source consumer lags
+wait_for_zero_consumer_lags $source_console_consumer_grp $zk_source_port
-verify_consumer_rebalancing
+# wait for zero target consumer lags
+wait_for_zero_consumer_lags $target_console_consumer_grp $zk_target_port
+
+# =======================================================
+# remove the following 'sleep 30' when KAFKA-313 is fixed
+# =======================================================
+info "sleeping 30 sec"
+sleep 30
shutdown_servers
cmp_checksum
result=$?
+# ===============================================
+# Report the time taken
+# ===============================================
+test_end_time="$(date +%s)"
+total_test_time_sec=$(( $test_end_time - $test_start_time ))
+total_test_time_min=$(( $total_test_time_sec / 60 ))
+info "Total time taken: $total_test_time_min min for $num_iterations iterations"
+echo
+
exit $result
Modified: incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties Fri Jul 6 20:44:51 2012
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=INFO, stdout, kafkaAppender
+log4j.rootLogger=INFO, stdout
# ====================================
# messages going to kafkaAppender
@@ -27,7 +27,7 @@ log4j.logger.org.apache.zookeeper=INFO,
# ====================================
# (comment out this line to redirect ZK-related messages to kafkaAppender
# to allow reading both Kafka and ZK debugging messages in a single file)
-#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
+log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
# ====================================
# stdout
@@ -73,6 +73,7 @@ log4j.additivity.org.apache.zookeeper=fa
log4j.logger.kafka.consumer=DEBUG
log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
+log4j.logger.kafka.tools.ConsumerOffsetChecker=DEBUG
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties Fri Jul 6 20:44:51 2012
@@ -15,7 +15,8 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9081
+#broker.list=0:localhost:9081
+zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties Fri Jul 6 20:44:51 2012
@@ -15,7 +15,8 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9082
+#broker.list=0:localhost:9082
+zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties Fri Jul 6 20:44:51 2012
@@ -15,7 +15,8 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9083
+#broker.list=0:localhost:9083
+zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties Fri Jul 6 20:44:51 2012
@@ -75,7 +75,7 @@ log.default.flush.interval.ms=1000
log.default.flush.scheduler.interval.ms=1000
# set sendBufferSize
-send.buffer.size=10000
+send.buffer.size=500000
# set receiveBufferSize
-receive.buffer.size=10000
+receive.buffer.size=500000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties?rev=1358407&r1=1358406&r2=1358407&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties Fri Jul 6 20:44:51 2012
@@ -41,7 +41,7 @@ socket.send.buffer=1048576
socket.receive.buffer=1048576
# the maximum size of a log segment
-log.file.size=536870912
+log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
Added: incubator/kafka/trunk/system_test/common/util.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/common/util.sh?rev=1358407&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/common/util.sh (added)
+++ incubator/kafka/trunk/system_test/common/util.sh Fri Jul 6 20:44:51 2012
@@ -0,0 +1,168 @@
+#!/bin/bash
+
+# =========================================
+# info - print messages with timestamp
+# =========================================
+info() {
+ echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+# =========================================
+# info_no_newline - print messages with
+# timestamp without newline
+# =========================================
+info_no_newline() {
+ echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+# =========================================
+# get_random_range - return a random number
+# between the lower & upper bounds
+# usage:
+# get_random_range $lower $upper
+# random_no=$?
+# =========================================
+get_random_range() {
+ lo=$1
+ up=$2
+ range=$(($up - $lo + 1))
+
+ return $(($(($RANDOM % range)) + $lo))
+}
+
+# =========================================
+# kill_child_processes - terminate a
+# process and its child processes
+# =========================================
+kill_child_processes() {
+ isTopmost=$1
+ curPid=$2
+ childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+
+ for childPid in $childPids
+ do
+ kill_child_processes 0 $childPid
+ done
+ if [ $isTopmost -eq 0 ]; then
+ kill -15 $curPid 2> /dev/null
+ fi
+}
+
+# =========================================================================
+# generate_kafka_properties_files -
+# 1. it takes the following arguments and generate server_{1..n}.properties
+# for the total no. of kafka broker as specified in "num_server"; the
+# resulting properties files will be located at:
+# <kafka home>/system_test/<test suite>/config
+# 2. the default values in the generated properties files will be copied
+# from the settings in config/server.properties while the brokerid and
+# server port will be incremented accordingly
+# 3. to generate properties files with non-default values such as
+# "socket.send.buffer=2097152", simply add the property with new value
+# to the array variable kafka_properties_to_replace as shown below
+# =========================================================================
+generate_kafka_properties_files() {
+
+ test_suite_full_path=$1 # eg. <kafka home>/system_test/single_host_multi_brokers
+ num_server=$2 # total no. of brokers in the cluster
+ brokerid_to_start=$3 # this should be '0' in most cases
+ kafka_port_to_start=$4 # if 9091 is used, the rest would be 9092, 9093, ...
+
+ this_config_dir=${test_suite_full_path}/config
+
+ # info "test suite full path : $test_suite_full_path"
+ # info "broker id to start : $brokerid_to_start"
+ # info "kafka port to start : $kafka_port_to_start"
+ # info "num of server : $num_server"
+ # info "config dir : $this_config_dir"
+
+ # =============================================
+ # array to keep kafka properties statements
+ # from the file 'server.properties' need
+ # to be changed from their default values
+ # =============================================
+ # kafka_properties_to_replace # DO NOT uncomment this line !!
+
+ # =============================================
+ # Uncomment the following kafka properties
+ # array element as needed to change the default
+ # values. Other kafka properties can be added
+ # in a similar fashion.
+ # =============================================
+ # kafka_properties_to_replace[1]="socket.send.buffer=2097152"
+ # kafka_properties_to_replace[2]="socket.receive.buffer=2097152"
+ # kafka_properties_to_replace[3]="num.partitions=3"
+ # kafka_properties_to_replace[4]="max.socket.request.bytes=10485760"
+
+ server_properties=`cat ${this_config_dir}/server.properties`
+
+ for ((i=1; i<=$num_server; i++))
+ do
+ # ======================
+ # update misc properties
+ # ======================
+ for ((j=1; j<=${#kafka_properties_to_replace[@]}; j++))
+ do
+ keyword_to_replace=`echo ${kafka_properties_to_replace[${j}]} | awk -F '=' '{print $1}'`
+ string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+ # info "string to be replaced : [$string_to_be_replaced]"
+ # info "string to replace : [${kafka_properties_to_replace[${j}]}]"
+
+ echo "${server_properties}" | \
+ sed -e "s/${string_to_be_replaced}/${kafka_properties_to_replace[${j}]}/g" \
+ >${this_config_dir}/server_${i}.properties
+
+ server_properties=`cat ${this_config_dir}/server_${i}.properties`
+ done
+
+ # ======================
+ # update brokerid
+ # ======================
+ keyword_to_replace="brokerid="
+ string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+ brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
+ string_to_replace="${keyword_to_replace}${brokerid_idx}"
+ # info "string to be replaced : [${string_to_be_replaced}]"
+ # info "string to replace : [${string_to_replace}]"
+
+ echo "${server_properties}" | \
+ sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+ >${this_config_dir}/server_${i}.properties
+
+ server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+ # ======================
+ # update kafak_port
+ # ======================
+ keyword_to_replace="port="
+ string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+ port_idx=$(( $kafka_port_to_start + $i - 1 ))
+ string_to_replace="${keyword_to_replace}${port_idx}"
+ # info "string to be replaced : [${string_to_be_replaced}]"
+ # info "string to replace : [${string_to_replace}]"
+
+ echo "${server_properties}" | \
+ sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+ >${this_config_dir}/server_${i}.properties
+
+ server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+ # ======================
+ # update kafka_log dir
+ # ======================
+ keyword_to_replace="log.dir="
+ string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+ string_to_be_replaced=${string_to_be_replaced//\//\\\/}
+ string_to_replace="${keyword_to_replace}\/tmp\/kafka_server_${i}_logs"
+ # info "string to be replaced : [${string_to_be_replaced}]"
+ # info "string to replace : [${string_to_replace}]"
+
+ echo "${server_properties}" | \
+ sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+ >${this_config_dir}/server_${i}.properties
+
+ server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+ done
+}
+