You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/14 07:19:34 UTC

svn commit: r1243786 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer: SyncProducer.scala SyncProducerConfig.scala

Author: junrao
Date: Tue Feb 14 06:19:34 2012
New Revision: 1243786

URL: http://svn.apache.org/viewvc?rev=1243786&view=rev
Log:
time-based reconnect in producer; patched by Yang Ye; reviewed by Jun Rao; KAFKA-268

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1243786&r1=1243785&r2=1243786&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Tue Feb 14 06:19:34 2012
@@ -40,6 +40,8 @@ class SyncProducer(val config: SyncProdu
   private val MaxConnectBackoffMs = 60000
   private var channel : SocketChannel = null
   private var sentOnConnection = 0
+  private var lastConnectionTime = System.currentTimeMillis
+
   private val lock = new Object()
   @volatile
   private var shutdown: Boolean = false
@@ -94,10 +96,12 @@ class SyncProducer(val config: SyncProdu
       }
       // TODO: do we still need this?
       sentOnConnection += 1
-      if(sentOnConnection >= config.reconnectInterval) {
+
+      if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) {
         disconnect()
         channel = connect()
         sentOnConnection = 0
+        lastConnectionTime = System.currentTimeMillis
       }
       val endTime = SystemTime.nanoseconds
       SyncProducerStats.recordProduceRequest(endTime - startTime)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1243786&r1=1243785&r2=1243786&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Tue Feb 14 06:19:34 2012
@@ -40,5 +40,8 @@ trait SyncProducerConfigShared {
 
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
+  /** negative reconnect time interval means disabling this time-based reconnect feature */
+  var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
 }