You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/07 18:38:16 UTC
svn commit: r1241528 - in /incubator/kafka/trunk:
core/src/main/scala/kafka/consumer/ perf/src/main/scala/kafka/perf/
system_test/broker_failure/bin/ system_test/broker_failure/config/
Author: nehanarkhede
Date: Tue Feb 7 17:38:15 2012
New Revision: 1241528
URL: http://svn.apache.org/viewvc?rev=1241528&view=rev
Log:
KAFKA-263 Enhance single host broker failure test to have 2 topics with uneven distribution on the source brokers; patched by John Fung; reviewed by Neha Narkhede
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Feb 7 17:38:15 2012
@@ -210,9 +210,19 @@ object ConsoleConsumer extends Logging {
}
class ChecksumMessageFormatter extends MessageFormatter {
+ private var topicStr: String = _
+
+ override def init(props: Properties) {
+ topicStr = props.getProperty("topic")
+ if (topicStr != null)
+ topicStr = topicStr + "-"
+ else
+ topicStr = ""
+ }
+
def writeTo(message: Message, output: PrintStream) {
val chksum = message.checksum
- output.println("checksum:" + chksum)
+ output.println(topicStr + "checksum:" + chksum)
}
}
Modified: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Feb 7 17:38:15 2012
@@ -20,18 +20,16 @@ package kafka.perf
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import kafka.producer._
-import async.DefaultEventHandler
import org.apache.log4j.Logger
-import joptsimple.OptionParser
import kafka.message.{CompressionCodec, Message}
-import kafka.serializer.DefaultEncoder
import java.text.SimpleDateFormat
-import java.util.{Date, Random, Properties}
+import java.util.{Random, Properties}
+import kafka.utils.Logging
/**
* Load test for the producer
*/
-object ProducerPerformance {
+object ProducerPerformance extends Logging {
def main(args: Array[String]) {
@@ -141,7 +139,6 @@ object ProducerPerformance {
val totalMessagesSent: AtomicLong,
val allDone: CountDownLatch,
val rand: Random) extends Runnable {
- val logger = Logger.getLogger(getClass)
val props = new Properties()
val brokerInfoList = config.brokerInfo.split("=")
if (brokerInfoList(0) == "zk.connect") {
@@ -171,7 +168,7 @@ object ProducerPerformance {
var lastReportTime = reportTime
val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
else config.numMessages / config.numThreads
- if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread)
+ debug("Messages per thread = " + messagesPerThread)
var messageSet: List[Message] = Nil
if(config.isFixSize) {
for(k <- 0 until config.batchSize) {
@@ -203,11 +200,11 @@ object ProducerPerformance {
rand.nextBytes(messageBytes)
val message = new Message(messageBytes)
producer.send(new ProducerData[Message,Message](config.topic, message))
- if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+ debug(config.topic + "-checksum:" + message.checksum)
bytesSent += message.payloadSize
}else {
producer.send(new ProducerData[Message,Message](config.topic, message))
- if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+ debug(config.topic + "-checksum:" + message.checksum)
bytesSent += message.payloadSize
}
nSends += 1
Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Tue Feb 7 17:38:15 2012
@@ -68,12 +68,14 @@ readonly test_start_time="$(date +%s)"
readonly num_msg_per_batch=500
readonly batches_per_iteration=5
-readonly num_iterations=12
+readonly num_iterations=1
readonly zk_source_port=2181
readonly zk_mirror_port=2182
-readonly topic_1=test01
+readonly topic_prefix=test
+readonly max_topic_id=2
+readonly unbalanced_start_id=2
readonly consumer_grp=group1
readonly source_console_consumer_grp=source
readonly mirror_console_consumer_grp=mirror
@@ -96,10 +98,16 @@ readonly num_kafka_target_server=3
readonly wait_time_after_killing_broker=0
readonly wait_time_after_restarting_broker=5
-background_producer_pid=
+readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094"
+readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093"
+
+background_producer_pid_1=
+background_producer_pid_2=
+
no_bouncing=$#
iter=1
+abort_test=false
pid_zk_source=
pid_zk_target=
@@ -177,17 +185,29 @@ get_random_range() {
verify_consumer_rebalancing() {
- info "Verifying consumer rebalancing operation"
+ info "Verifying consumer rebalancing operation"
- $base_dir/bin/kafka-run-class.sh \
- kafka.tools.VerifyConsumerRebalance \
- --zk.connect=localhost:2181 \
- --group $consumer_grp \
- 2>&1 >> $consumer_rebalancing_log
+ CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \
+ kafka.tools.VerifyConsumerRebalance \
+ --zk.connect=localhost:2181 \
+ --group $consumer_grp`
+ echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log
+
+ REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1`
+ # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE"
+ REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"`
+ info "REBALANCE_STATUS: $REBALANCE_STATUS"
+
+ if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then
+ info "setting abort_test to true due to Rebalance operation failed"
+ abort_test="true"
+ fi
}
wait_for_zero_consumer_lags() {
+ topic_id=$1
+
# no of times to check for zero lagging
no_of_zero_to_verify=3
@@ -196,7 +216,7 @@ wait_for_zero_consumer_lags() {
TOTAL_LAG=0
CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
--group $consumer_grp --zkconnect localhost:$zk_source_port \
- --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
for lag in $CONSUMER_LAGS;
do
@@ -217,6 +237,8 @@ wait_for_zero_consumer_lags() {
wait_for_zero_source_console_consumer_lags() {
+ topic_id=$1
+
# no of times to check for zero lagging
no_of_zero_to_verify=3
@@ -225,7 +247,7 @@ wait_for_zero_source_console_consumer_la
TOTAL_LAG=0
CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
--group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \
- --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
for lag in $CONSUMER_LAGS;
do
@@ -246,6 +268,8 @@ wait_for_zero_source_console_consumer_la
wait_for_zero_mirror_console_consumer_lags() {
+ topic_id=$1
+
# no of times to check for zero lagging
no_of_zero_to_verify=3
@@ -254,7 +278,7 @@ wait_for_zero_mirror_console_consumer_la
TOTAL_LAG=0
CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
--group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \
- --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+ --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
for lag in $CONSUMER_LAGS;
do
@@ -321,6 +345,8 @@ cleanup() {
rm -f $console_consumer_source_crc_sorted_log
rm -f $console_consumer_mirror_crc_sorted_uniq_log
rm -f $console_consumer_source_crc_sorted_uniq_log
+
+ rm -f $consumer_rebalancing_log
}
start_zk() {
@@ -380,40 +406,65 @@ start_embedded_consumer_server() {
}
start_console_consumer_for_source_producer() {
- info "starting console consumers for source producer"
+
+ topic_id=$1
+
+ info "starting console consumers for source producer on topic id [$topic_id]"
$base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
--zookeeper localhost:$zk_source_port \
- --topic $topic_1 \
+ --topic ${topic_prefix}_${topic_id} \
--group $source_console_consumer_grp \
- --from-beginning \
+ --from-beginning --consumer-timeout-ms 5000 \
--formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
- 2>&1 > ${console_consumer_source_log} &
- console_consumer_source_pid=$!
-
- info " -> console consumer source pid: $console_consumer_source_pid"
+ --property topic=${topic_prefix}_${topic_id} \
+ 2>&1 >> ${console_consumer_source_log}
}
start_console_consumer_for_mirror_producer() {
- info "starting console consumers for mirroring producer"
+
+ topic_id=$1
+
+ info "starting console consumers for mirroring producer on topic id [$topic_id]"
$base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
--zookeeper localhost:$zk_mirror_port \
- --topic $topic_1 \
+ --topic ${topic_prefix}_${topic_id} \
--group $mirror_console_consumer_grp \
- --from-beginning \
+ --from-beginning --consumer-timeout-ms 5000 \
--formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
- 2>&1 > ${console_consumer_mirror_log} &
- console_consumer_mirror_pid=$!
+ --property topic=${topic_prefix}_${topic_id} \
+ 2>&1 >> ${console_consumer_mirror_log}
+}
- info " -> console consumer mirror pid: $console_consumer_mirror_pid"
+consume_source_producer_messages() {
+ consumer_counter=1
+ while [ $consumer_counter -le $max_topic_id ]
+ do
+ start_console_consumer_for_source_producer $consumer_counter
+ consumer_counter=$(( $consumer_counter + 1 ))
+ done
+}
+
+consume_mirror_producer_messages() {
+ consumer_counter=1
+ while [ $consumer_counter -le $max_topic_id ]
+ do
+ start_console_consumer_for_mirror_producer $consumer_counter
+ consumer_counter=$(( $consumer_counter + 1 ))
+ done
}
shutdown_producer() {
info "shutting down producer"
- if [ "x${background_producer_pid}" != "x" ]; then
- # kill_child_processes 0 ${background_producer_pid};
- kill -TERM ${background_producer_pid} 2> /dev/null;
+ if [ "x${background_producer_pid_1}" != "x" ]; then
+ # kill_child_processes 0 ${background_producer_pid_1};
+ kill -TERM ${background_producer_pid_1} 2> /dev/null;
+ fi
+
+ if [ "x${background_producer_pid_2}" != "x" ]; then
+ # kill_child_processes 0 ${background_producer_pid_2};
+ kill -TERM ${background_producer_pid_2} 2> /dev/null;
fi
}
@@ -450,13 +501,15 @@ shutdown_servers() {
}
start_background_producer() {
+ bkrinfo_str=$1
+ start_topic_id=$2
+ end_topic_id=$3
batch_no=0
- curr_iter=0
+ topic_id=${start_topic_id}
- while [ $num_iterations -gt $curr_iter ]
+ while [ 'x' == 'x' ]
do
- topic=$1
sleeptime=
get_random_range $sleep_min $sleep_max
@@ -464,19 +517,24 @@ start_background_producer() {
batch_no=$(($batch_no + 1))
+ if [ $topic_id -gt $end_topic_id ]; then
+ topic_id=${start_topic_id}
+ fi
+
$base_dir/bin/kafka-run-class.sh \
kafka.perf.ProducerPerformance \
- --brokerinfo zk.connect=localhost:2181 \
- --topic $topic \
+ --brokerinfo $bkrinfo_str \
+ --topic ${topic_prefix}_${topic_id} \
--messages $num_msg_per_batch \
--message-size $message_size \
--batch-size 50 \
--vary-message-size \
--threads 1 \
- --reporting-interval $num_msg_per_batch \
- --async \
+ --reporting-interval $num_msg_per_batch --async \
2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs
+ topic_id=$(( $topic_id + 1 ))
+
sleep $sleeptime
done
}
@@ -485,9 +543,9 @@ cmp_checksum() {
cmp_result=0
- grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
- grep ^checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
- grep ^checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log
+ grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
+ grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
+ grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log
sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log
sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log
@@ -555,6 +613,37 @@ cmp_checksum() {
echo "========================================================" >> $checksum_diff_log
echo "${duplicate_mirror_crc}" >> $checksum_diff_log
+ topic_chksum_counter=1
+ while [ $topic_chksum_counter -le $max_topic_id ]
+ do
+ # get producer topic counts
+ this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log`
+ echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+ topic_chksum_counter=$(($topic_chksum_counter + 1))
+ done
+ echo
+
+ topic_chksum_counter=1
+ while [ $topic_chksum_counter -le $max_topic_id ]
+ do
+ this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log`
+ echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+ topic_chksum_counter=$(($topic_chksum_counter + 1))
+ done
+ echo
+
+ topic_chksum_counter=1
+ while [ $topic_chksum_counter -le $max_topic_id ]
+ do
+ this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log`
+ echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+ topic_chksum_counter=$(($topic_chksum_counter + 1))
+ done
+ echo
+
return $cmp_result
}
@@ -567,15 +656,32 @@ start_test() {
start_target_servers_cluster
sleep 2
- start_background_producer $topic_1 &
- background_producer_pid=$!
+ start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) &
+ background_producer_pid_1=$!
info "=========================================="
- info "Started background producer pid [${background_producer_pid}]"
+ info "Started background producer pid [${background_producer_pid_1}]"
info "=========================================="
- sleep 5
-
+ sleep 10
+
+ start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id &
+ background_producer_pid_2=$!
+
+ info "=========================================="
+ info "Started background producer pid [${background_producer_pid_2}]"
+ info "=========================================="
+
+ sleep 10
+
+ verify_consumer_rebalancing
+
+ info "abort_test: [${abort_test}]"
+ if [ "${abort_test}_x" == "true_x" ]; then
+ info "aborting test"
+ iter=$((${num_iterations} + 1))
+ fi
+
while [ $num_iterations -ge $iter ]
do
echo
@@ -592,7 +698,6 @@ start_test() {
# even iterations -> bounce target kafka borker
get_random_range 1 $num_kafka_target_server
idx=$?
-
if [ "x${kafka_target_pids[$idx]}" != "x" ]; then
echo
info "#### Bouncing kafka TARGET broker ####"
@@ -631,7 +736,15 @@ start_test() {
sleep $wait_time_after_restarting_broker
fi
fi
+
verify_consumer_rebalancing
+
+ info "abort_test: [${abort_test}]"
+ if [ "${abort_test}_x" == "true_x" ]; then
+ info "aborting test"
+ iter=$((${num_iterations} + 1))
+ fi
+
else
info "No bouncing performed"
fi
@@ -670,8 +783,8 @@ trap "shutdown_producer; shutdown_server
start_test
-start_console_consumer_for_source_producer
-start_console_consumer_for_mirror_producer
+consume_source_producer_messages
+consume_mirror_producer_messages
wait_for_zero_source_console_consumer_lags
wait_for_zero_mirror_console_consumer_lags
Modified: incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties Tue Feb 7 17:38:15 2012
@@ -12,30 +12,74 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=INFO, stdout, kafkaAppender
+
+# ====================================
+# messages going to kafkaAppender
+# ====================================
+log4j.logger.kafka=DEBUG, kafkaAppender
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, kafkaAppender
+log4j.logger.org.apache.zookeeper=INFO, kafkaAppender
+
+# ====================================
+# messages going to zookeeperAppender
+# ====================================
+# (comment out this line to redirect ZK-related messages to kafkaAppender
+# to allow reading both Kafka and ZK debugging messages in a single file)
+#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
+
+# ====================================
+# stdout
+# ====================================
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
-
-
-# Turn on all our debugging info
-#log4j.logger.kafka=INFO
-log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
-log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.kafka.consumer=DEBUG
-log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
-log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
+# ====================================
+# fileAppender
+# ====================================
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=/tmp/kafka_all_request.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# ====================================
+# kafkaAppender
+# ====================================
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.File=/tmp/kafka.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.kafka=true
+
+# ====================================
+# zookeeperAppender
+# ====================================
+log4j.appender.zookeeperAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.zookeeperAppender.File=/tmp/zookeeper.log
+log4j.appender.zookeeperAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.zookeeperAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.org.apache.zookeeper=false
+
+# ====================================
+# other available debugging info
+# ====================================
+#log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
+#log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
#log4j.logger.kafka.producer.async.AsyncProducer=TRACE
#log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
-log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+#log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+
+log4j.logger.kafka.consumer=DEBUG
log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
# to print message checksum from ProducerPerformance
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print socket buffer size validated by Kafka broker
+log4j.logger.kafka.network.Acceptor=DEBUG
+
+# to print socket buffer size validated by SimpleConsumer
+log4j.logger.kafka.consumer.SimpleConsumer=TRACE
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer1.properties Tue Feb 7 17:38:15 2012
@@ -15,7 +15,7 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9093
+broker.list=0:localhost:9081
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer2.properties Tue Feb 7 17:38:15 2012
@@ -15,7 +15,7 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9094
+broker.list=0:localhost:9082
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/mirror_producer3.properties Tue Feb 7 17:38:15 2012
@@ -15,7 +15,7 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9095
+broker.list=0:localhost:9083
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source1.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=1
num.partitions=1
# the port the socket server runs on
-port=9092
+port=9091
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source2.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=2
num.partitions=1
# the port the socket server runs on
-port=9091
+port=9092
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source3.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=3
num.partitions=1
# the port the socket server runs on
-port=9090
+port=9093
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_source4.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=4
num.partitions=1
# the port the socket server runs on
-port=9096
+port=9094
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target1.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=1
num.partitions=1
# the port the socket server runs on
-port=9093
+port=9081
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target2.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=2
num.partitions=1
# the port the socket server runs on
-port=9094
+port=9082
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/server_target3.properties Tue Feb 7 17:38:15 2012
@@ -26,7 +26,7 @@ brokerid=3
num.partitions=1
# the port the socket server runs on
-port=9095
+port=9083
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
Modified: incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties?rev=1241528&r1=1241527&r2=1241528&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/whitelisttest.consumer.properties Tue Feb 7 17:38:15 2012
@@ -25,5 +25,5 @@ zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
-mirror.topics.whitelist=test01
-
+mirror.topics.whitelist=test_1,test_2
+autooffset.reset=smallest