You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/14 07:19:34 UTC
svn commit: r1243786 - in
/incubator/kafka/trunk/core/src/main/scala/kafka/producer:
SyncProducer.scala SyncProducerConfig.scala
Author: junrao
Date: Tue Feb 14 06:19:34 2012
New Revision: 1243786
URL: http://svn.apache.org/viewvc?rev=1243786&view=rev
Log:
time-based reconnect in producer; patched by Yang Ye; reviewed by Jun Rao; KAFKA-268
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1243786&r1=1243785&r2=1243786&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Tue Feb 14 06:19:34 2012
@@ -40,6 +40,8 @@ class SyncProducer(val config: SyncProdu
private val MaxConnectBackoffMs = 60000
private var channel : SocketChannel = null
private var sentOnConnection = 0
+ private var lastConnectionTime = System.currentTimeMillis
+
private val lock = new Object()
@volatile
private var shutdown: Boolean = false
@@ -94,10 +96,12 @@ class SyncProducer(val config: SyncProdu
}
// TODO: do we still need this?
sentOnConnection += 1
- if(sentOnConnection >= config.reconnectInterval) {
+
+ if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) {
disconnect()
channel = connect()
sentOnConnection = 0
+ lastConnectionTime = System.currentTimeMillis
}
val endTime = SystemTime.nanoseconds
SyncProducerStats.recordProduceRequest(endTime - startTime)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1243786&r1=1243785&r2=1243786&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Tue Feb 14 06:19:34 2012
@@ -40,5 +40,8 @@ trait SyncProducerConfigShared {
val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+ /** negative reconnect time interval means disabling this time-based reconnect feature */
+ var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+
val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
}