You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/07 18:38:16 UTC

svn commit: r1241528 - in /incubator/kafka/trunk: core/src/main/scala/kafka/consumer/ perf/src/main/scala/kafka/perf/ system_test/broker_failure/bin/ system_test/broker_failure/config/

Author: nehanarkhede
Date: Tue Feb  7 17:38:15 2012
New Revision: 1241528

URL: http://svn.apache.org/viewvc?rev=1241528&view=rev
Log:
KAFKA-263 Enhance single host broker failure test to have 2 topics with uneven distribution on the source brokers; patched by John Fung; reviewed by Neha Narkhede

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
    incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
    incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
    incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
    incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
    incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Feb  7 17:38:15 2012
@@ -210,9 +210,19 @@ object ConsoleConsumer extends Logging {
   }
 
   class ChecksumMessageFormatter extends MessageFormatter {
+    private var topicStr: String = _
+    
+    override def init(props: Properties) {
+      topicStr = props.getProperty("topic")
+      if (topicStr != null) 
+        topicStr = topicStr + "-"
+      else
+        topicStr = ""
+    }
+    
     def writeTo(message: Message, output: PrintStream) {
       val chksum = message.checksum
-      output.println("checksum:" + chksum)
+      output.println(topicStr + "checksum:" + chksum)
     }
   }
   

Modified: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Feb  7 17:38:15 2012
@@ -20,18 +20,16 @@ package kafka.perf
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.producer._
-import async.DefaultEventHandler
 import org.apache.log4j.Logger
-import joptsimple.OptionParser
 import kafka.message.{CompressionCodec, Message}
-import kafka.serializer.DefaultEncoder
 import java.text.SimpleDateFormat
-import java.util.{Date, Random, Properties}
+import java.util.{Random, Properties}
+import kafka.utils.Logging
 
 /**
  * Load test for the producer
  */
-object ProducerPerformance {
+object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
 
@@ -141,7 +139,6 @@ object ProducerPerformance {
                        val totalMessagesSent: AtomicLong,
                        val allDone: CountDownLatch,
                        val rand: Random) extends Runnable {
-    val logger = Logger.getLogger(getClass)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
     if (brokerInfoList(0) == "zk.connect") {
@@ -171,7 +168,7 @@ object ProducerPerformance {
       var lastReportTime = reportTime
       val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
                               else config.numMessages / config.numThreads
-      if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread)
+      debug("Messages per thread = " + messagesPerThread)
       var messageSet: List[Message] = Nil
       if(config.isFixSize) {
         for(k <- 0 until config.batchSize) {
@@ -203,11 +200,11 @@ object ProducerPerformance {
               rand.nextBytes(messageBytes)
               val message = new Message(messageBytes)
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }else {
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }
             nSends += 1

Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Tue Feb  7 17:38:15 2012
@@ -68,12 +68,14 @@ readonly test_start_time="$(date +%s)"
 
 readonly num_msg_per_batch=500
 readonly batches_per_iteration=5
-readonly num_iterations=12
+readonly num_iterations=1
 
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182
 
-readonly topic_1=test01
+readonly topic_prefix=test
+readonly max_topic_id=2
+readonly unbalanced_start_id=2
 readonly consumer_grp=group1
 readonly source_console_consumer_grp=source
 readonly mirror_console_consumer_grp=mirror
@@ -96,10 +98,16 @@ readonly num_kafka_target_server=3
 readonly wait_time_after_killing_broker=0
 readonly wait_time_after_restarting_broker=5
 
-background_producer_pid=
+readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094"
+readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093"
+
+background_producer_pid_1=
+background_producer_pid_2=
+
 no_bouncing=$#
 
 iter=1
+abort_test=false
 
 pid_zk_source=
 pid_zk_target=
@@ -177,17 +185,29 @@ get_random_range() {
 
 verify_consumer_rebalancing() {
 
-   info "Verifying consumer rebalancing operation"
+    info "Verifying consumer rebalancing operation"
 
-    $base_dir/bin/kafka-run-class.sh \
-        kafka.tools.VerifyConsumerRebalance \
-        --zk.connect=localhost:2181 \
-        --group $consumer_grp \
-     2>&1 >> $consumer_rebalancing_log
+    CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \
+                                 kafka.tools.VerifyConsumerRebalance \
+                                 --zk.connect=localhost:2181 \
+                                 --group $consumer_grp`
+    echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log
+
+    REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1`
+    # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE"
+    REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"`
+    info "REBALANCE_STATUS: $REBALANCE_STATUS"
+
+    if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then
+        info "setting abort_test to true due to Rebalance operation failed"
+        abort_test="true"
+    fi
 }
 
 wait_for_zero_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -196,7 +216,7 @@ wait_for_zero_consumer_lags() {
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -217,6 +237,8 @@ wait_for_zero_consumer_lags() {
 
 wait_for_zero_source_console_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -225,7 +247,7 @@ wait_for_zero_source_console_consumer_la
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -246,6 +268,8 @@ wait_for_zero_source_console_consumer_la
 
 wait_for_zero_mirror_console_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -254,7 +278,7 @@ wait_for_zero_mirror_console_consumer_la
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -321,6 +345,8 @@ cleanup() {
     rm -f $console_consumer_source_crc_sorted_log
     rm -f $console_consumer_mirror_crc_sorted_uniq_log
     rm -f $console_consumer_source_crc_sorted_uniq_log
+
+    rm -f $consumer_rebalancing_log
 }
 
 start_zk() {
@@ -380,40 +406,65 @@ start_embedded_consumer_server() {
 }
 
 start_console_consumer_for_source_producer() {
-    info "starting console consumers for source producer"
+
+    topic_id=$1
+
+    info "starting console consumers for source producer on topic id [$topic_id]"
 
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
         --zookeeper localhost:$zk_source_port \
-        --topic $topic_1 \
+        --topic ${topic_prefix}_${topic_id} \
         --group $source_console_consumer_grp \
-        --from-beginning \
+        --from-beginning --consumer-timeout-ms 5000 \
         --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        2>&1 > ${console_consumer_source_log} &
-    console_consumer_source_pid=$!
-
-    info "  -> console consumer source pid: $console_consumer_source_pid"
+        --property topic=${topic_prefix}_${topic_id} \
+        2>&1 >> ${console_consumer_source_log} 
 }
 
 start_console_consumer_for_mirror_producer() {
-    info "starting console consumers for mirroring producer"
+
+    topic_id=$1
+
+    info "starting console consumers for mirroring producer on topic id [$topic_id]"
 
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
         --zookeeper localhost:$zk_mirror_port \
-        --topic $topic_1 \
+        --topic ${topic_prefix}_${topic_id} \
         --group $mirror_console_consumer_grp \
-        --from-beginning \
+        --from-beginning --consumer-timeout-ms 5000 \
         --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        2>&1 > ${console_consumer_mirror_log} &
-    console_consumer_mirror_pid=$!
+        --property topic=${topic_prefix}_${topic_id} \
+        2>&1 >> ${console_consumer_mirror_log} 
+}
 
-    info "  -> console consumer mirror pid: $console_consumer_mirror_pid"
+consume_source_producer_messages() {
+    consumer_counter=1
+    while [ $consumer_counter -le $max_topic_id ]
+    do
+        start_console_consumer_for_source_producer $consumer_counter
+        consumer_counter=$(( $consumer_counter + 1 ))
+    done
+}
+
+consume_mirror_producer_messages() {
+    consumer_counter=1
+    while [ $consumer_counter -le $max_topic_id ]
+    do
+        start_console_consumer_for_mirror_producer $consumer_counter
+        consumer_counter=$(( $consumer_counter + 1 ))
+    done
 }
 
 shutdown_producer() {
     info "shutting down producer"
-    if [ "x${background_producer_pid}" != "x" ]; then
-        # kill_child_processes 0 ${background_producer_pid};
-        kill -TERM ${background_producer_pid} 2> /dev/null;
+    if [ "x${background_producer_pid_1}" != "x" ]; then
+        # kill_child_processes 0 ${background_producer_pid_1};
+        kill -TERM ${background_producer_pid_1} 2> /dev/null;
+    fi
+
+    if [ "x${background_producer_pid_2}" != "x" ]; then
+        # kill_child_processes 0 ${background_producer_pid_2};
+        kill -TERM ${background_producer_pid_2} 2> /dev/null;
     fi
 }
 
@@ -450,13 +501,15 @@ shutdown_servers() {
 }
 
 start_background_producer() {
+    bkrinfo_str=$1
+    start_topic_id=$2
+    end_topic_id=$3
 
     batch_no=0
-    curr_iter=0
+    topic_id=${start_topic_id}
 
-    while [ $num_iterations -gt $curr_iter ]
+    while [ 'x' == 'x' ]
     do
-        topic=$1
         sleeptime=
 
         get_random_range $sleep_min $sleep_max
@@ -464,19 +517,24 @@ start_background_producer() {
 
         batch_no=$(($batch_no + 1))
 
+        if [ $topic_id -gt $end_topic_id ]; then
+            topic_id=${start_topic_id}
+        fi
+
         $base_dir/bin/kafka-run-class.sh \
             kafka.perf.ProducerPerformance \
-            --brokerinfo zk.connect=localhost:2181 \
-            --topic $topic \
+            --brokerinfo $bkrinfo_str \
+            --topic ${topic_prefix}_${topic_id} \
             --messages $num_msg_per_batch \
             --message-size $message_size \
             --batch-size 50 \
             --vary-message-size \
             --threads 1 \
-            --reporting-interval $num_msg_per_batch \
-            --async \
+            --reporting-interval $num_msg_per_batch --async \
             2>&1 >> $base_dir/producer_performance.log    # appending all producers' msgs
 
+        topic_id=$(( $topic_id + 1 ))
+
         sleep $sleeptime
     done
 }
@@ -485,9 +543,9 @@ cmp_checksum() {
 
     cmp_result=0
 
-    grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
-    grep ^checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
-    grep ^checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log
+    grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
+    grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
+    grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log
 
     sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log
     sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log
@@ -555,6 +613,37 @@ cmp_checksum() {
     echo "========================================================" >> $checksum_diff_log
     echo "${duplicate_mirror_crc}"                                  >> $checksum_diff_log
 
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        # get producer topic counts
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log`
+        echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log`
+        echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log`
+        echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
     return $cmp_result
 }
 
@@ -567,15 +656,32 @@ start_test() {
     start_target_servers_cluster
     sleep 2
 
-    start_background_producer $topic_1 &
-    background_producer_pid=$!
+    start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) &
+    background_producer_pid_1=$!
 
     info "=========================================="
-    info "Started background producer pid [${background_producer_pid}]"
+    info "Started background producer pid [${background_producer_pid_1}]"
     info "=========================================="
 
-    sleep 5
-    
+    sleep 10
+   
+    start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id &
+    background_producer_pid_2=$!
+
+    info "=========================================="
+    info "Started background producer pid [${background_producer_pid_2}]"
+    info "=========================================="
+
+    sleep 10
+
+    verify_consumer_rebalancing
+
+    info "abort_test: [${abort_test}]"
+    if [ "${abort_test}_x" == "true_x" ]; then
+        info "aborting test"
+        iter=$((${num_iterations} + 1))
+    fi
+ 
     while [ $num_iterations -ge $iter ]
     do
         echo
@@ -592,7 +698,6 @@ start_test() {
                 # even iterations -> bounce target kafka borker
                 get_random_range 1 $num_kafka_target_server 
                 idx=$?
-
                 if [ "x${kafka_target_pids[$idx]}" != "x" ]; then
                     echo
                     info "#### Bouncing kafka TARGET broker ####"
@@ -631,7 +736,15 @@ start_test() {
                     sleep $wait_time_after_restarting_broker
                 fi
             fi
+
             verify_consumer_rebalancing
+
+            info "abort_test: [${abort_test}]"
+            if [ "${abort_test}_x" == "true_x" ]; then
+                info "aborting test"
+                iter=$((${num_iterations} + 1))
+            fi
+
         else
             info "No bouncing performed"
         fi
@@ -670,8 +783,8 @@ trap "shutdown_producer; shutdown_server
 
 start_test
 
-start_console_consumer_for_source_producer
-start_console_consumer_for_mirror_producer
+consume_source_producer_messages
+consume_mirror_producer_messages
 
 wait_for_zero_source_console_consumer_lags
 wait_for_zero_mirror_console_consumer_lags

Modified: incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties Tue Feb  7 17:38:15 2012
@@ -12,30 +12,74 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=INFO, stdout
 
+log4j.rootLogger=INFO, stdout, kafkaAppender
+
+# ====================================
+# messages going to kafkaAppender
+# ====================================
+log4j.logger.kafka=DEBUG, kafkaAppender
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, kafkaAppender
+log4j.logger.org.apache.zookeeper=INFO, kafkaAppender
+
+# ====================================
+# messages going to zookeeperAppender
+# ====================================
+# (comment out this line to redirect ZK-related messages to kafkaAppender
+#  to allow reading both Kafka and ZK debugging messages in a single file)
+#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
+
+# ====================================
+# stdout
+# ====================================
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
 
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
-
-
-# Turn on all our debugging info
-#log4j.logger.kafka=INFO
-log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
-log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.kafka.consumer=DEBUG
-log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
-log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
+# ====================================
+# fileAppender
+# ====================================
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=/tmp/kafka_all_request.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# ====================================
+# kafkaAppender
+# ====================================
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.File=/tmp/kafka.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.kafka=true
+
+# ====================================
+# zookeeperAppender
+# ====================================
+log4j.appender.zookeeperAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.zookeeperAppender.File=/tmp/zookeeper.log
+log4j.appender.zookeeperAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.zookeeperAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.org.apache.zookeeper=false
+
+# ====================================
+# other available debugging info 
+# ====================================
+#log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
+#log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
 #log4j.logger.kafka.producer.async.AsyncProducer=TRACE
 #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
-log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+#log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+
+log4j.logger.kafka.consumer=DEBUG
 log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
 
 # to print message checksum from ProducerPerformance
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG 
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print socket buffer size validated by Kafka broker
+log4j.logger.kafka.network.Acceptor=DEBUG
+
+# to print socket buffer size validated by SimpleConsumer
+log4j.logger.kafka.consumer.SimpleConsumer=TRACE
 

Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties Tue Feb  7 17:38:15 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9093
+broker.list=0:localhost:9081
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties Tue Feb  7 17:38:15 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9094
+broker.list=0:localhost:9082
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties Tue Feb  7 17:38:15 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9095
+broker.list=0:localhost:9083
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=1
 num.partitions=1
 
 # the port the socket server runs on
-port=9092
+port=9091
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=2
 num.partitions=1
 
 # the port the socket server runs on
-port=9091
+port=9092
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=3
 num.partitions=1
 
 # the port the socket server runs on
-port=9090
+port=9093
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=4
 num.partitions=1
 
 # the port the socket server runs on
-port=9096
+port=9094
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=1
 num.partitions=1
 
 # the port the socket server runs on
-port=9093
+port=9081
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=2
 num.partitions=1
 
 # the port the socket server runs on
-port=9094
+port=9082
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties Tue Feb  7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=3
 num.partitions=1
 
 # the port the socket server runs on
-port=9095
+port=9083
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties Tue Feb  7 17:38:15 2012
@@ -25,5 +25,5 @@ zk.connectiontimeout.ms=1000000
 #consumer group id
 groupid=group1
 
-mirror.topics.whitelist=test01
-
+mirror.topics.whitelist=test_1,test_2
+autooffset.reset=smallest