You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/10 20:03:30 UTC

git commit: Fault tolerance broken with replication factor 1; kafka-691; patched by Maxime Brugidou; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 ddd66cbd8 -> b71e6dc35


Fault tolerance broken with replication factor 1; kafka-691; patched by Maxime Brugidou; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b71e6dc3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b71e6dc3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b71e6dc3

Branch: refs/heads/0.8
Commit: b71e6dc352770f22daec0c9a3682138666f032be
Parents: ddd66cb
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jan 10 11:03:19 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jan 10 11:03:19 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/producer/DefaultPartitioner.scala  |    5 +--
 .../main/scala/kafka/producer/ProducerConfig.scala |   10 ++++
 .../kafka/producer/async/DefaultEventHandler.scala |   37 ++++++++++----
 3 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b71e6dc3/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index c82670e..9bffeb6 100644
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -24,9 +24,6 @@ private class DefaultPartitioner[T](props: VerifiableProperties = null) extends
   private val random = new java.util.Random
   
   def partition(key: T, numPartitions: Int): Int = {
-    if(key == null)
-      random.nextInt(numPartitions)
-    else
-      Utils.abs(key.hashCode) % numPartitions
+    Utils.abs(key.hashCode) % numPartitions
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71e6dc3/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index e559187..235b228 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -113,5 +113,15 @@ class ProducerConfig private (val props: VerifiableProperties)
 
   val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
 
+  /**
+   * The producer generally refreshes the topic metadata from brokers when there is a failure
+   * (partition missing, leader not available...). It will also poll regularly (default: every 10min
+   * so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
+   * If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
+   * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
+   * a message the metadata is never refreshed
+   */
+  val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000)
+
   validate(this)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71e6dc3/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 4f04862..24a9dc9 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -21,13 +21,12 @@ import kafka.common._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{Utils, Logging, SystemTime}
 import scala.collection.{Seq, Map}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
 import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
 
-
 class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val partitioner: Partitioner[K],
                                private val encoder: Encoder[V],
@@ -43,6 +42,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 
   private val lock = new Object()
 
+  private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
+  private var lastTopicMetadataRefreshTime = 0L
+  private val topicMetadataToRefresh = Set.empty[String]
+
   private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
@@ -58,6 +61,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+        topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
+        if (topicMetadataRefreshInterval >= 0 &&
+            SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+          Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
+          topicMetadataToRefresh.clear
+          lastTopicMetadataRefreshTime = SystemTime.milliseconds
+        }
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
         if (outstandingProduceRequests.size > 0)  {
           // back off and update the topic metadata cache before attempting another send operation
@@ -133,9 +143,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     try {
       for (message <- messages) {
         val topicPartitionsList = getPartitionListForTopic(message)
-        val totalNumPartitions = topicPartitionsList.length
-
-        val partitionIndex = getPartition(message.key, totalNumPartitions)
+        val partitionIndex = getPartition(message.key, topicPartitionsList)
         val brokerPartition = topicPartitionsList(partitionIndex)
 
         // postpone the failure until the send operation, so that requests for other brokers are handled correctly
@@ -184,17 +192,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
    * the value of partition is not between 0 and numPartitions-1
    * @param key the partition key
-   * @param numPartitions the total number of available partitions
+   * @param topicPartitionList the list of available partitions
    * @return the partition id
    */
-  private def getPartition(key: K, numPartitions: Int): Int = {
+  private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
+    val numPartitions = topicPartitionList.size
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
     val partition =
-      if(key == null)
-        Utils.abs(partitionCounter.getAndIncrement()) % numPartitions
-      else
+      if(key == null) {
+        // If the key is null, we don't really need a partitioner so we just send to the next
+        // available partition
+        val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
+        if (availablePartitions.isEmpty)
+          throw new LeaderNotAvailableException("No leader for any partition")
+        val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
+        availablePartitions(index).partitionId
+      } else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +