You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/06 19:27:06 UTC
svn commit: r1369888 -
/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
Author: nehanarkhede
Date: Mon Aug 6 17:27:05 2012
New Revision: 1369888
URL: http://svn.apache.org/viewvc?rev=1369888&view=rev
Log:
KAFKA-380 Enhance single_host_multi_brokers test with failure to trigger leader re-election in replication; patched by John Fung; reviewed by Neha Narkhede, Jun Rao and Joel Koshy
Modified:
incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1369888&r1=1369887&r2=1369888&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Mon Aug 6 17:27:05 2012
@@ -24,6 +24,7 @@
# (keep this section at the beginning
# of this script)
# ====================================
+readonly osname=`uname -s` # OS name
readonly system_test_root=$(dirname $0)/../.. # path of <kafka install>/system_test
readonly common_dir=${system_test_root}/common # common util scripts for system_test
source ${common_dir}/util.sh # include the util script
@@ -33,22 +34,23 @@ readonly base_dir_full_path=`cd $base_di
readonly config_dir=${base_dir}/config
readonly test_start_time="$(date +%s)" # time starting the test
+max_reelection_latency_ms=0
+min_reelection_latency_ms=10000
+sum_reelection_latency_ms=0
+reelection_counter=0
# ====================================
-# Change the followings as needed
+# No need to change the following
+# configurations in most cases
# ====================================
readonly num_kafka_server=3 # same no. of property files such as server_{1..n}.properties
# will be automatically generated
readonly replica_factor=3 # should be less than or equal to "num_kafka_server"
readonly my_brokerid_to_start=0 # this should be '0' for now
readonly my_server_port_to_start=9091 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=20 # batch no. of messsages by producer
+readonly producer_msg_batch_size=100 # batch no. of messsages by producer
readonly consumer_timeout_ms=10000 # elapsed time for consumer to timeout and exit
-# ====================================
-# No need to change the following
-# configurations in most cases
-# ====================================
readonly test_topic=mytest
readonly max_wait_for_consumer_complete=30
readonly zk_prop_pathname=${config_dir}/zookeeper.properties
@@ -88,6 +90,7 @@ zk_data_log_dir=
pid_zk=
kafka_pids=
test_failure_counter=0
+leader_elected_timestamp=
initialize() {
info "initializing ..."
@@ -109,6 +112,7 @@ initialize() {
info "kafka $i prop file : ${kafka_prop_pathnames[$i]}"
info "kafka $i brokerid : ${kafka_brokerids[$i]}"
info "kafka $i socket : ${kafka_sock_ports[$i]}"
+ echo
done
info "zookeeper port : $zk_port"
@@ -147,6 +151,42 @@ get_leader_brokerid() {
return $broker_id
}
+get_elected_leader_unix_timestamp() {
+ log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
+ info "found the log line: $log_line"
+
+ this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
+ elected_leader_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
+ this_timestamp=`echo ${this_timestamp%,*}`
+
+ if [ "x${osname}" == "xDarwin" ]; then
+ elected_leader_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
+ else
+ elected_leader_unix_timestamp=`date -d "$this_timestamp" +%s`
+ fi
+
+ full_elected_leader_unix_timestamp="${elected_leader_unix_timestamp}.${elected_leader_unix_timestamp_ms}"
+}
+
+get_server_shutdown_unix_timestamp() {
+ s_idx=$1
+
+ log_line=`grep -i -h 'shut down completed' ${kafka_log4j_log_pathnames[$s_idx]} | tail -1`
+ info "found the log line: $log_line"
+
+ this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
+ server_shutdown_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
+ this_timestamp=`echo ${this_timestamp%,*}`
+
+ if [ "x${osname}" == "xDarwin" ]; then
+ server_shutdown_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
+ else
+ server_shutdown_unix_timestamp=`date -d "$this_timestamp" +%s`
+ fi
+
+ full_server_shutdown_unix_timestamp="${server_shutdown_unix_timestamp}.${server_shutdown_unix_timestamp_ms}"
+}
+
start_zk() {
info "starting zookeeper"
$base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \
@@ -229,16 +269,24 @@ shutdown_servers() {
info "shutting down zookeeper servers"
if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi
+
+ force_shutdown_producer
+ force_shutdown_consumer
+
+ # running processes are not terminated properly in a Hudson job,
+ # this is a temporary workaround to kill all processes
+ `ps axuw | grep "java\|run\-" | grep -v grep | grep -v slave | grep -v vi | grep -v "run\-test\.sh" | awk '{print $2}' | xargs kill -9`
+
}
force_shutdown_producer() {
info "force shutdown producer"
- `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9`
+ `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9 2> /dev/null`
}
force_shutdown_consumer() {
info "force shutdown consumer"
- `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9`
+ `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9 2> /dev/null`
}
create_topic() {
@@ -270,7 +318,7 @@ validate_results() {
info "## ==> crc ${kafka_first_data_file_checksums[$i]}"
done
- # get the MessageID from messages produced and consumed
+ # get the MessageID from messages produced and consumed, sort them and output to log files
grep MessageID $console_consumer_log_pathname | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_mid_log_pathname
grep MessageID $producer_perf_log_pathname | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $producer_perf_mid_log_pathname
@@ -297,6 +345,9 @@ validate_results() {
validation_start_unix_ts=`date +%s`
curr_unix_ts=`date +%s`
size_unmatched_idx=1
+
+ # do a while-loop to check every 5 sec if the replica file sizes are matched
+ # (up to the value of $max_wait_for_consumer_complete)
while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete && $size_unmatched_idx -gt 0 ]]
do
info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data sizes"
@@ -315,38 +366,57 @@ validate_results() {
curr_unix_ts=`date +%s`
done
+ echo
# validate that sizes of all replicas should match
first_element_value=${kafka_first_data_file_sizes[1]}
for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
do
- if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
- info "## FAILURE: Unmatched size found"
- test_failure_counter=$(( $test_failure_counter + 1 ))
- fi
+ if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
+ info "## FAILURE: File[$i] zero file size found"
+ elif [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
+ info "## FAILURE: Unmatched size found"
+ test_failure_counter=$(( $test_failure_counter + 1 ))
+ else
+ info "## PASSED: Data files sizes matched"
+ fi
done
# validate that checksums of all replicas should match
first_element_value=${kafka_first_data_file_checksums[1]}
for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++))
do
- if [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
- info "## FAILURE: Unmatched checksum found"
- test_failure_counter=$(( $test_failure_counter + 1 ))
- fi
+ if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
+ info "## FAILURE: Checksum cannot be validated because file[$i] zero file size found"
+ elif [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
+ info "## FAILURE: Unmatched checksum found"
+ test_failure_counter=$(( $test_failure_counter + 1 ))
+ else
+ info "## PASSED: Data files checksums matched"
+ fi
done
# validate that there is no data loss
if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ]; then
+ info "## FAILURE: Data loss found"
test_failure_counter=$(( $test_failure_counter + 1 ))
+ else
+ info "## PASSED: Message counts matched"
fi
+ avg_reelection_latency_ms=`echo "$sum_reelection_latency_ms / $reelection_counter" | bc`
+ info "## Max latency : $max_reelection_latency_ms ms"
+ info "## Min latency : $min_reelection_latency_ms ms"
+ info "## Avg latency : $avg_reelection_latency_ms ms"
+
# report PASSED or FAILED
info "========================================================"
if [ $test_failure_counter -eq 0 ]; then
info "## Test PASSED"
+ exit 0
else
info "## Test FAILED"
+ exit 1
fi
info "========================================================"
}
@@ -361,6 +431,9 @@ start_test() {
# Ctrl-c trap. Catches INT signal
trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" INT
+ trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" TERM
+ trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" KILL
+
generate_kafka_properties_files $base_dir_full_path $num_kafka_server $my_brokerid_to_start $my_server_port_to_start
@@ -383,10 +456,15 @@ start_test() {
for ((i=1; i<=$num_kafka_server; i++))
do
- info "kafka server [$i] - reading leader"
+ echo
+ info "======================================="
+ info "Iteration $i of $num_kafka_server"
+ info "======================================="
+ echo
+ info "looking up leader"
get_leader_brokerid
ldr_bkr_id=$?
- info "leader broker id: $ldr_bkr_id"
+ info "current leader's broker id : $ldr_bkr_id"
svr_idx=$(($ldr_bkr_id))
@@ -394,6 +472,30 @@ start_test() {
info "sleeping for 10s"
sleep 10
+ get_server_shutdown_unix_timestamp $svr_idx
+ get_elected_leader_unix_timestamp
+
+ reelected_leader_latency=`echo "$full_elected_leader_unix_timestamp - $full_server_shutdown_unix_timestamp" | bc`
+ reelected_leader_latency_ms_float=`echo "$reelected_leader_latency * 1000" | bc`
+ reelected_leader_latency_ms=${reelected_leader_latency_ms_float/.*}
+
+ info "---------------------------------------"
+ info "leader re-election latency : $reelected_leader_latency_ms ms"
+ info "---------------------------------------"
+
+ sum_reelection_latency_ms=$(($sum_reelection_latency_ms + $reelected_leader_latency_ms))
+ reelection_counter=$(($reelection_counter + 1))
+
+ # update $max_reelection_latency_ms
+ if [ $reelected_leader_latency_ms -gt $max_reelection_latency_ms ]; then
+ max_reelection_latency_ms=$reelected_leader_latency_ms
+ fi
+
+ # update $min_reelection_latency_ms
+ if [ $reelected_leader_latency_ms -le $min_reelection_latency_ms ]; then
+ min_reelection_latency_ms=$reelected_leader_latency_ms
+ fi
+
init_id=$(( ($i - 1) * $producer_msg_batch_size ))
start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
info "sleeping for 15s"
@@ -409,10 +511,10 @@ start_test() {
info "sleeping for 30s"
sleep 30
- validate_results
+ shutdown_servers
echo
- shutdown_servers
+ validate_results
echo
}