You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/11/24 14:51:48 UTC

kafka git commit: KAFKA-4384; ReplicaFetcherThread stops after ReplicaFetcherThread receives a corrupted message

Repository: kafka
Updated Branches:
  refs/heads/trunk e035fc039 -> 3e3b7a010


KAFKA-4384; ReplicaFetcherThread stops after ReplicaFetcherThread receives a corrupted message

Author: Jun He <ju...@airbnb.com>

Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2127 from jun-he/KAFKA-4384


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e3b7a01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e3b7a01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e3b7a01

Branch: refs/heads/trunk
Commit: 3e3b7a010be9959ac6f205a0ea66a2595bb62ae6
Parents: e035fc0
Author: Jun He <ju...@airbnb.com>
Authored: Thu Nov 24 13:10:19 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 24 14:44:54 2016 +0000

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherThread.scala    | 18 ++--
 .../server/AbstractFetcherThreadTest.scala      | 99 ++++++++++++++++++--
 2 files changed, 101 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e3b7a01/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 30f5125..b3a8751 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,18 +145,19 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.NONE =>
                   try {
                     val messages = partitionData.toByteBufferMessageSet
-                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
-                      case Some(m) =>
-                        partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(m.nextOffset))
-                        fetcherStats.byteRate.mark(messages.validBytes)
-                        m.nextOffset
-                      case None =>
-                        currentPartitionFetchState.offset
-                    }
+                    val newOffset = messages.shallowIterator.toSeq.lastOption.map(_.nextOffset).getOrElse(
+                      currentPartitionFetchState.offset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+
+                    val validBytes = messages.validBytes
+                    if (validBytes > 0) {
+                      // Update partitionStates only if there is no exception during processPartitionData
+                      partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
+                      fetcherStats.byteRate.mark(validBytes)
+                    }
                   } catch {
                     case ime: CorruptRecordException =>
                       // we log the error and continue. This ensures two things
@@ -164,6 +165,7 @@ abstract class AbstractFetcherThread(name: String,
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
                       logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
+                      updatePartitionsWithError(topicPartition);
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                         .format(topic, partitionId, currentPartitionFetchState.offset), e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e3b7a01/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 1cd2496..7d6ad91 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -19,16 +19,17 @@ package kafka.server
 
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
-import kafka.message.ByteBufferMessageSet
+import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Utils
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{mutable, Map}
 
 class AbstractFetcherThreadTest {
 
@@ -90,10 +91,10 @@ class AbstractFetcherThreadTest {
     override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition)
   }
 
-  class DummyPartitionData extends PartitionData {
+  class TestPartitionData(byteBufferMessageSet: ByteBufferMessageSet) extends PartitionData {
     override def errorCode: Short = Errors.NONE.code
 
-    override def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet()
+    override def toByteBufferMessageSet: ByteBufferMessageSet = byteBufferMessageSet
 
     override def highWatermark: Long = 0L
 
@@ -102,8 +103,9 @@ class AbstractFetcherThreadTest {
 
   class DummyFetcherThread(name: String,
                            clientId: String,
-                           sourceBroker: BrokerEndPoint)
-    extends AbstractFetcherThread(name, clientId, sourceBroker, 0) {
+                           sourceBroker: BrokerEndPoint,
+                           fetchBackOffMs: Int = 0)
+    extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
 
     type REQ = DummyFetchRequest
     type PD = PartitionData
@@ -116,11 +118,92 @@ class AbstractFetcherThreadTest {
 
     override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {}
 
-    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, DummyPartitionData)] =
-      fetchRequest.offsets.mapValues(_ => new DummyPartitionData).toSeq
+    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] =
+      fetchRequest.offsets.mapValues(_ => new TestPartitionData(new ByteBufferMessageSet())).toSeq
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
       new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
   }
 
+
+  @Test
+  def testFetchRequestCorruptedMessageException() {
+    val partition = new TopicPartition("topic", 0)
+    val fetcherThread = new CorruptingFetcherThread("test", "client", new BrokerEndPoint(0, "localhost", 9092),
+      fetchBackOffMs = 1)
+
+    fetcherThread.start()
+
+    // Add one partition for fetching
+    fetcherThread.addPartitions(Map(partition -> 0L))
+
+    // Wait until fetcherThread finishes the work
+    TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread tp finish the work")
+
+    fetcherThread.shutdown()
+
+    // The fetcherThread should have fetched two normal messages
+    assertTrue(fetcherThread.logEndOffset == 2)
+  }
+
+  class CorruptingFetcherThread(name: String,
+                                clientId: String,
+                                sourceBroker: BrokerEndPoint,
+                                fetchBackOffMs: Int = 0)
+    extends DummyFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+
+    @volatile var logEndOffset = 0L
+    @volatile var fetchCount = 0
+
+    private val normalPartitionDataSet = List(
+      new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(0L), new Message("hello".getBytes))),
+      new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(1L), new Message("hello".getBytes)))
+    )
+
+    override def processPartitionData(topicAndPartition: TopicPartition,
+                                      fetchOffset: Long,
+                                      partitionData: PartitionData): Unit = {
+      // Throw exception if the fetchOffset does not match the fetcherThread partition state
+      if (fetchOffset != logEndOffset)
+        throw new RuntimeException(
+          "Offset mismatch for partition %s: fetched offset = %d, log end offset = %d."
+            .format(topicAndPartition, fetchOffset, logEndOffset))
+
+      // Now check message's crc
+      val messages = partitionData.toByteBufferMessageSet
+      for (messageAndOffset <- messages.shallowIterator) {
+        messageAndOffset.message.ensureValid()
+        logEndOffset = messageAndOffset.nextOffset
+      }
+    }
+
+    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] = {
+      fetchCount += 1
+      // Set the first fetch to get a corrupted message
+      if (fetchCount == 1) {
+        val corruptedMessage = new Message("hello".getBytes)
+        val badChecksum = (corruptedMessage.checksum + 1 % Int.MaxValue).toInt
+        // Garble checksum
+        Utils.writeUnsignedInt(corruptedMessage.buffer, Message.CrcOffset, badChecksum)
+        val byteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, corruptedMessage)
+        fetchRequest.offsets.mapValues(_ => new TestPartitionData(byteBufferMessageSet)).toSeq
+      } else
+      // Then, the following fetches get the normal data
+        fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
+    }
+
+    override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {
+      val requestMap = new mutable.HashMap[TopicPartition, Long]
+      partitionMap.foreach { case (topicPartition, partitionFetchState) =>
+        // Add backoff delay check
+        if (partitionFetchState.isActive)
+          requestMap.put(topicPartition, partitionFetchState.offset)
+      }
+      new DummyFetchRequest(requestMap)
+    }
+
+    override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) = delayPartitions(partitions, fetchBackOffMs.toLong)
+
+  }
+
 }