You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/11/24 14:51:48 UTC
kafka git commit: KAFKA-4384;
ReplicaFetcherThread stops after ReplicaFetcherThread receives a
corrupted message
Repository: kafka
Updated Branches:
refs/heads/trunk e035fc039 -> 3e3b7a010
KAFKA-4384; ReplicaFetcherThread stops after ReplicaFetcherThread receives a corrupted message
Author: Jun He <ju...@airbnb.com>
Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2127 from jun-he/KAFKA-4384
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e3b7a01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e3b7a01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e3b7a01
Branch: refs/heads/trunk
Commit: 3e3b7a010be9959ac6f205a0ea66a2595bb62ae6
Parents: e035fc0
Author: Jun He <ju...@airbnb.com>
Authored: Thu Nov 24 13:10:19 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 24 14:44:54 2016 +0000
----------------------------------------------------------------------
.../kafka/server/AbstractFetcherThread.scala | 18 ++--
.../server/AbstractFetcherThreadTest.scala | 99 ++++++++++++++++++--
2 files changed, 101 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e3b7a01/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 30f5125..b3a8751 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,18 +145,19 @@ abstract class AbstractFetcherThread(name: String,
case Errors.NONE =>
try {
val messages = partitionData.toByteBufferMessageSet
- val newOffset = messages.shallowIterator.toSeq.lastOption match {
- case Some(m) =>
- partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(m.nextOffset))
- fetcherStats.byteRate.mark(messages.validBytes)
- m.nextOffset
- case None =>
- currentPartitionFetchState.offset
- }
+ val newOffset = messages.shallowIterator.toSeq.lastOption.map(_.nextOffset).getOrElse(
+ currentPartitionFetchState.offset)
fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+
+ val validBytes = messages.validBytes
+ if (validBytes > 0) {
+ // Update partitionStates only if there is no exception during processPartitionData
+ partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
+ fetcherStats.byteRate.mark(validBytes)
+ }
} catch {
case ime: CorruptRecordException =>
// we log the error and continue. This ensures two things
@@ -164,6 +165,7 @@ abstract class AbstractFetcherThread(name: String,
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
+ updatePartitionsWithError(topicPartition);
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentPartitionFetchState.offset), e)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e3b7a01/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 1cd2496..7d6ad91 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -19,16 +19,17 @@ package kafka.server
import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
-import kafka.message.ByteBufferMessageSet
+import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Utils
import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{mutable, Map}
class AbstractFetcherThreadTest {
@@ -90,10 +91,10 @@ class AbstractFetcherThreadTest {
override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition)
}
- class DummyPartitionData extends PartitionData {
+ class TestPartitionData(byteBufferMessageSet: ByteBufferMessageSet) extends PartitionData {
override def errorCode: Short = Errors.NONE.code
- override def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet()
+ override def toByteBufferMessageSet: ByteBufferMessageSet = byteBufferMessageSet
override def highWatermark: Long = 0L
@@ -102,8 +103,9 @@ class AbstractFetcherThreadTest {
class DummyFetcherThread(name: String,
clientId: String,
- sourceBroker: BrokerEndPoint)
- extends AbstractFetcherThread(name, clientId, sourceBroker, 0) {
+ sourceBroker: BrokerEndPoint,
+ fetchBackOffMs: Int = 0)
+ extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
type REQ = DummyFetchRequest
type PD = PartitionData
@@ -116,11 +118,92 @@ class AbstractFetcherThreadTest {
override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {}
- override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, DummyPartitionData)] =
- fetchRequest.offsets.mapValues(_ => new DummyPartitionData).toSeq
+ override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] =
+ fetchRequest.offsets.mapValues(_ => new TestPartitionData(new ByteBufferMessageSet())).toSeq
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
}
+
+ @Test
+ def testFetchRequestCorruptedMessageException() {
+ val partition = new TopicPartition("topic", 0)
+ val fetcherThread = new CorruptingFetcherThread("test", "client", new BrokerEndPoint(0, "localhost", 9092),
+ fetchBackOffMs = 1)
+
+ fetcherThread.start()
+
+ // Add one partition for fetching
+ fetcherThread.addPartitions(Map(partition -> 0L))
+
+ // Wait until fetcherThread finishes the work
+ TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread tp finish the work")
+
+ fetcherThread.shutdown()
+
+ // The fetcherThread should have fetched two normal messages
+ assertTrue(fetcherThread.logEndOffset == 2)
+ }
+
+ class CorruptingFetcherThread(name: String,
+ clientId: String,
+ sourceBroker: BrokerEndPoint,
+ fetchBackOffMs: Int = 0)
+ extends DummyFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+
+ @volatile var logEndOffset = 0L
+ @volatile var fetchCount = 0
+
+ private val normalPartitionDataSet = List(
+ new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(0L), new Message("hello".getBytes))),
+ new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(1L), new Message("hello".getBytes)))
+ )
+
+ override def processPartitionData(topicAndPartition: TopicPartition,
+ fetchOffset: Long,
+ partitionData: PartitionData): Unit = {
+ // Throw exception if the fetchOffset does not match the fetcherThread partition state
+ if (fetchOffset != logEndOffset)
+ throw new RuntimeException(
+ "Offset mismatch for partition %s: fetched offset = %d, log end offset = %d."
+ .format(topicAndPartition, fetchOffset, logEndOffset))
+
+ // Now check message's crc
+ val messages = partitionData.toByteBufferMessageSet
+ for (messageAndOffset <- messages.shallowIterator) {
+ messageAndOffset.message.ensureValid()
+ logEndOffset = messageAndOffset.nextOffset
+ }
+ }
+
+ override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] = {
+ fetchCount += 1
+ // Set the first fetch to get a corrupted message
+ if (fetchCount == 1) {
+ val corruptedMessage = new Message("hello".getBytes)
+ val badChecksum = (corruptedMessage.checksum + 1 % Int.MaxValue).toInt
+ // Garble checksum
+ Utils.writeUnsignedInt(corruptedMessage.buffer, Message.CrcOffset, badChecksum)
+ val byteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, corruptedMessage)
+ fetchRequest.offsets.mapValues(_ => new TestPartitionData(byteBufferMessageSet)).toSeq
+ } else
+ // Then, the following fetches get the normal data
+ fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
+ }
+
+ override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {
+ val requestMap = new mutable.HashMap[TopicPartition, Long]
+ partitionMap.foreach { case (topicPartition, partitionFetchState) =>
+ // Add backoff delay check
+ if (partitionFetchState.isActive)
+ requestMap.put(topicPartition, partitionFetchState.offset)
+ }
+ new DummyFetchRequest(requestMap)
+ }
+
+ override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) = delayPartitions(partitions, fetchBackOffMs.toLong)
+
+ }
+
}