You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 02:46:09 UTC

svn commit: r1377166 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/cluster/Partition.scala main/scala/kafka/server/KafkaApis.scala test/scala/unit/kafka/integration/PrimitiveApiTest.scala

Author: junrao
Date: Sat Aug 25 00:46:08 2012
New Revision: 1377166

URL: http://svn.apache.org/viewvc?rev=1377166&view=rev
Log:
maintain HW correctly with only 1 replica in ISR; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-420

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Aug 25 00:46:08 2012
@@ -131,6 +131,8 @@ class Partition(val topic: String,
       leaderEpoch = leaderAndISR.leaderEpoch
       zkVersion = leaderAndISR.zkVersion
       leaderReplicaIdOpt = Some(localBrokerId)
+      // we may need to increment high watermark since ISR could be down to 1
+      maybeIncrementLeaderHW(getReplica().get)
   }
 
   /**
@@ -210,7 +212,7 @@ class Partition(val topic: String,
     }
   }
   
-  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
+  def maybeIncrementLeaderHW(leaderReplica: Replica) {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min
     val oldHighWatermark = leaderReplica.highWatermark
@@ -232,6 +234,8 @@ class Partition(val topic: String,
             info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
             updateISR(newInSyncReplicas)
+            // we may need to increment high watermark since ISR could be down to 1
+            maybeIncrementLeaderHW(leaderReplica)
           }
         case None => // do nothing if no longer leader
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Aug 25 00:46:08 2012
@@ -184,6 +184,8 @@ class KafkaApis(val requestChannel: Requ
           val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
           val log = localReplica.log.get
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+          // we may need to increment high watermark since ISR could be down to 1
+          localReplica.partition.maybeIncrementLeaderHW(localReplica)
           offsets(msgIndex) = log.logEndOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace("%d bytes written to logs, nextAppendOffset = %d"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Sat Aug 25 00:46:08 2012
@@ -110,6 +110,10 @@ class PrimitiveApiTest extends JUnit3Sui
     val stringProducer1 = new Producer[String, String](config)
     stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
 
+    val replica = servers.head.replicaManager.getReplica(topic, 0).get
+    assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
+               replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
+
     val request = new FetchRequestBuilder()
       .correlationId(100)
       .clientId("test-client")