You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/20 23:24:52 UTC

svn commit: r1291490 - /incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala

Author: junrao
Date: Mon Feb 20 22:24:52 2012
New Revision: 1291490

URL: http://svn.apache.org/viewvc?rev=1291490&view=rev
Log:
make time-based reconnect starting at a random time; patched by Yang Ye; reviewed by Jun Rao; KAFKA-268

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1291490&r1=1291489&r2=1291490&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Feb 20 22:24:52 2012
@@ -26,9 +26,11 @@ import kafka.api._
 import scala.math._
 import kafka.common.MessageSizeTooLargeException
 import java.nio.ByteBuffer
+import java.util.Random
 
 object SyncProducer {
   val RequestKey: Short = 0
+  val randomGenerator = new Random
 }
 
 /*
@@ -40,7 +42,8 @@ class SyncProducer(val config: SyncProdu
   private val MaxConnectBackoffMs = 60000
   private var channel : SocketChannel = null
   private var sentOnConnection = 0
-  private var lastConnectionTime = System.currentTimeMillis
+  /** make time-based reconnect starting at a random time **/
+  private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval
 
   private val lock = new Object()
   @volatile