You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/06 19:27:06 UTC

svn commit: r1369888 - /incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh

Author: nehanarkhede
Date: Mon Aug  6 17:27:05 2012
New Revision: 1369888

URL: http://svn.apache.org/viewvc?rev=1369888&view=rev
Log:
KAFKA-380 Enhance single_host_multi_brokers test with failure to trigger leader re-election in replication; patched by John Fung; reviewed by Neha Narkhede, Jun Rao and Joel Koshy

Modified:
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1369888&r1=1369887&r2=1369888&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Mon Aug  6 17:27:05 2012
@@ -24,6 +24,7 @@
 # (keep this section at the beginning
 # of this script)
 # ====================================
+readonly osname=`uname -s`                            # OS name
 readonly system_test_root=$(dirname $0)/../..         # path of <kafka install>/system_test
 readonly common_dir=${system_test_root}/common        # common util scripts for system_test
 source   ${common_dir}/util.sh                        # include the util script
@@ -33,22 +34,23 @@ readonly base_dir_full_path=`cd $base_di
 readonly config_dir=${base_dir}/config
 
 readonly test_start_time="$(date +%s)"                # time starting the test
+max_reelection_latency_ms=0
+min_reelection_latency_ms=10000
+sum_reelection_latency_ms=0
+reelection_counter=0
 
 # ====================================
-# Change the followings as needed
+# No need to change the following
+# configurations in most cases
 # ====================================
 readonly num_kafka_server=3                           # same no. of property files such as server_{1..n}.properties
                                                       # will be automatically generated
 readonly replica_factor=3                             # should be less than or equal to "num_kafka_server"
 readonly my_brokerid_to_start=0                       # this should be '0' for now
 readonly my_server_port_to_start=9091                 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=20                   # batch no. of messsages by producer
+readonly producer_msg_batch_size=100                  # batch no. of messsages by producer
 readonly consumer_timeout_ms=10000                    # elapsed time for consumer to timeout and exit
 
-# ====================================
-# No need to change the following
-# configurations in most cases
-# ====================================
 readonly test_topic=mytest
 readonly max_wait_for_consumer_complete=30
 readonly zk_prop_pathname=${config_dir}/zookeeper.properties
@@ -88,6 +90,7 @@ zk_data_log_dir=
 pid_zk=
 kafka_pids=
 test_failure_counter=0
+leader_elected_timestamp=
 
 initialize() {
     info "initializing ..."
@@ -109,6 +112,7 @@ initialize() {
         info "kafka $i prop file  : ${kafka_prop_pathnames[$i]}"
         info "kafka $i brokerid   : ${kafka_brokerids[$i]}"
         info "kafka $i socket     : ${kafka_sock_ports[$i]}"
+        echo
     done
 
     info "zookeeper port     : $zk_port"
@@ -147,6 +151,42 @@ get_leader_brokerid() {
     return $broker_id
 }
 
+get_elected_leader_unix_timestamp() {
+    log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
+    info "found the log line: $log_line"
+
+    this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
+    elected_leader_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
+    this_timestamp=`echo ${this_timestamp%,*}`
+
+    if [ "x${osname}" == "xDarwin" ]; then
+        elected_leader_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
+    else
+        elected_leader_unix_timestamp=`date -d "$this_timestamp" +%s`
+    fi
+
+    full_elected_leader_unix_timestamp="${elected_leader_unix_timestamp}.${elected_leader_unix_timestamp_ms}"
+}
+
+get_server_shutdown_unix_timestamp() {
+    s_idx=$1
+
+    log_line=`grep -i -h 'shut down completed' ${kafka_log4j_log_pathnames[$s_idx]} | tail -1`
+    info "found the log line: $log_line"
+
+    this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
+    server_shutdown_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
+    this_timestamp=`echo ${this_timestamp%,*}`
+
+    if [ "x${osname}" == "xDarwin" ]; then
+        server_shutdown_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
+    else
+        server_shutdown_unix_timestamp=`date -d "$this_timestamp" +%s`
+    fi
+
+    full_server_shutdown_unix_timestamp="${server_shutdown_unix_timestamp}.${server_shutdown_unix_timestamp_ms}"
+}
+
 start_zk() {
     info "starting zookeeper"
     $base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \
@@ -229,16 +269,24 @@ shutdown_servers() {
 
     info "shutting down zookeeper servers"
     if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi
+
+    force_shutdown_producer
+    force_shutdown_consumer
+
+     # running processes are not terminated properly in a Hudson job,
+     # this is a temporary workaround to kill all processes
+     `ps axuw | grep "java\|run\-" | grep -v grep | grep -v slave | grep -v vi | grep -v "run\-test\.sh" | awk '{print $2}' | xargs kill -9`
+
 }
 
 force_shutdown_producer() {
     info "force shutdown producer"
-    `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9`
+    `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9 2> /dev/null`
 }
 
 force_shutdown_consumer() {
     info "force shutdown consumer"
-    `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9`
+    `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9 2> /dev/null`
 }
 
 create_topic() {
@@ -270,7 +318,7 @@ validate_results() {
         info "##     ==> crc ${kafka_first_data_file_checksums[$i]}"
     done
 
-    # get the MessageID from messages produced and consumed
+    # get the MessageID from messages produced and consumed, sort them and output to log files
     grep MessageID $console_consumer_log_pathname | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_mid_log_pathname
     grep MessageID $producer_perf_log_pathname    | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $producer_perf_mid_log_pathname
 
@@ -297,6 +345,9 @@ validate_results() {
     validation_start_unix_ts=`date +%s`
     curr_unix_ts=`date +%s`
     size_unmatched_idx=1
+
+    # do a while-loop to check every 5 sec if the replica file sizes are matched
+    # (up to the value of $max_wait_for_consumer_complete)
     while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete && $size_unmatched_idx -gt 0 ]]
     do
         info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data sizes"
@@ -315,38 +366,57 @@ validate_results() {
 
         curr_unix_ts=`date +%s`
     done
+    echo
 
     # validate that sizes of all replicas should match
     first_element_value=${kafka_first_data_file_sizes[1]}
     for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
     do
-        if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
-            info "## FAILURE: Unmatched size found"
-            test_failure_counter=$(( $test_failure_counter + 1 ))
-        fi
+        if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
+            info "## FAILURE: File[$i] zero file size found"
+        elif [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
+             info "## FAILURE: Unmatched size found"
+             test_failure_counter=$(( $test_failure_counter + 1 ))
+        else
+            info "## PASSED: Data files sizes matched"
+         fi
     done
 
     # validate that checksums of all replicas should match
     first_element_value=${kafka_first_data_file_checksums[1]}
     for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++))
     do
-        if [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
-            info "## FAILURE: Unmatched checksum found"
-            test_failure_counter=$(( $test_failure_counter + 1 ))
-        fi
+        if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
+            info "## FAILURE: Checksum cannot be validated because file[$i] zero file size found"
+        elif [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
+             info "## FAILURE: Unmatched checksum found"
+             test_failure_counter=$(( $test_failure_counter + 1 ))
+        else
+            info "## PASSED: Data files checksums matched"
+         fi
     done
 
     # validate that there is no data loss
     if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ]; then
+        info "## FAILURE: Data loss found"
         test_failure_counter=$(( $test_failure_counter + 1 ))
+    else
+        info "## PASSED: Message counts matched"
     fi
 
+    avg_reelection_latency_ms=`echo "$sum_reelection_latency_ms / $reelection_counter" | bc`
+    info "## Max latency : $max_reelection_latency_ms ms"
+    info "## Min latency : $min_reelection_latency_ms ms"
+    info "## Avg latency : $avg_reelection_latency_ms ms"
+
     # report PASSED or FAILED
     info "========================================================"
     if [ $test_failure_counter -eq 0 ]; then
         info "## Test PASSED"
+        exit 0
     else
         info "## Test FAILED"
+        exit 1
     fi
     info "========================================================"
 }
@@ -361,6 +431,9 @@ start_test() {
 
     # Ctrl-c trap. Catches INT signal
     trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" INT
+    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" TERM
+    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" KILL
+
 
     generate_kafka_properties_files $base_dir_full_path $num_kafka_server $my_brokerid_to_start $my_server_port_to_start 
 
@@ -383,10 +456,15 @@ start_test() {
 
     for ((i=1; i<=$num_kafka_server; i++))
     do
-        info "kafka server [$i] - reading leader"
+        echo
+        info "======================================="
+        info "Iteration $i of $num_kafka_server"
+        info "======================================="
+        echo
+        info "looking up leader"
         get_leader_brokerid
         ldr_bkr_id=$?
-        info "leader broker id: $ldr_bkr_id"
+        info "current leader's broker id : $ldr_bkr_id"
 
         svr_idx=$(($ldr_bkr_id))
 
@@ -394,6 +472,30 @@ start_test() {
         info "sleeping for 10s"
         sleep 10
 
+        get_server_shutdown_unix_timestamp $svr_idx
+        get_elected_leader_unix_timestamp
+
+        reelected_leader_latency=`echo "$full_elected_leader_unix_timestamp - $full_server_shutdown_unix_timestamp" | bc`
+        reelected_leader_latency_ms_float=`echo "$reelected_leader_latency * 1000" | bc`
+        reelected_leader_latency_ms=${reelected_leader_latency_ms_float/.*}
+
+        info "---------------------------------------"
+        info "leader re-election latency : $reelected_leader_latency_ms ms"
+        info "---------------------------------------"
+
+        sum_reelection_latency_ms=$(($sum_reelection_latency_ms + $reelected_leader_latency_ms))
+        reelection_counter=$(($reelection_counter + 1))
+
+        # update $max_reelection_latency_ms
+        if [ $reelected_leader_latency_ms -gt $max_reelection_latency_ms ]; then
+            max_reelection_latency_ms=$reelected_leader_latency_ms
+        fi
+
+        # update $min_reelection_latency_ms
+        if [ $reelected_leader_latency_ms -le $min_reelection_latency_ms ]; then
+            min_reelection_latency_ms=$reelected_leader_latency_ms
+        fi
+
         init_id=$(( ($i - 1) * $producer_msg_batch_size ))
         start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
         info "sleeping for 15s"
@@ -409,10 +511,10 @@ start_test() {
     info "sleeping for 30s"
     sleep 30
 
-    validate_results
+    shutdown_servers
     echo
 
-    shutdown_servers
+    validate_results
     echo
 }