You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 02:46:09 UTC
svn commit: r1377166 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/cluster/Partition.scala
main/scala/kafka/server/KafkaApis.scala
test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Author: junrao
Date: Sat Aug 25 00:46:08 2012
New Revision: 1377166
URL: http://svn.apache.org/viewvc?rev=1377166&view=rev
Log:
maintain HW correctly with only 1 replica in ISR; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-420
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Aug 25 00:46:08 2012
@@ -131,6 +131,8 @@ class Partition(val topic: String,
leaderEpoch = leaderAndISR.leaderEpoch
zkVersion = leaderAndISR.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
+ // we may need to increment high watermark since ISR could be down to 1
+ maybeIncrementLeaderHW(getReplica().get)
}
/**
@@ -210,7 +212,7 @@ class Partition(val topic: String,
}
}
- private def maybeIncrementLeaderHW(leaderReplica: Replica) {
+ def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min
val oldHighWatermark = leaderReplica.highWatermark
@@ -232,6 +234,8 @@ class Partition(val topic: String,
info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
updateISR(newInSyncReplicas)
+ // we may need to increment high watermark since ISR could be down to 1
+ maybeIncrementLeaderHW(leaderReplica)
}
case None => // do nothing if no longer leader
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Aug 25 00:46:08 2012
@@ -184,6 +184,8 @@ class KafkaApis(val requestChannel: Requ
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
val log = localReplica.log.get
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+ // we may need to increment high watermark since ISR could be down to 1
+ localReplica.partition.maybeIncrementLeaderHW(localReplica)
offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace("%d bytes written to logs, nextAppendOffset = %d"
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1377166&r1=1377165&r2=1377166&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Sat Aug 25 00:46:08 2012
@@ -110,6 +110,10 @@ class PrimitiveApiTest extends JUnit3Sui
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
+ val replica = servers.head.replicaManager.getReplica(topic, 0).get
+ assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
+ replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
+
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")