You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/10 20:45:36 UTC

kafka git commit: KAFKA-5099; Replica Deletion Regression from KIP-101

Repository: kafka
Updated Branches:
  refs/heads/trunk 0eedd6d80 -> a420d20c0


KAFKA-5099; Replica Deletion Regression from KIP-101

Replica deletion regressed from KIP-101. Replica deletion happens when a broker receives a StopReplicaRequest with delete=true. Ever since KAFKA-1911, replica deletion has been async, meaning the broker responds with a StopReplicaResponse simply after marking the replica directory as staged for deletion. This marking happens by moving a data log directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, acting as a soft-delete. A scheduled thread later actually deletes the data. It appears that the regression occurs while the scheduled thread is actually trying to delete the data, which means the controller considers operations such as partition reassignment and topic deletion complete. But if you look at the log4j logs and data logs, you'll find that the soft-deleted data logs actually won't get deleted.

The bug is that upon log deletion, we attempt to flush the LeaderEpochFileCache to the original file location instead of the moved file location. Restarting the broker actually allows for the soft-deleted directories to get deleted.

This patch avoids the issue by simply not flushing the LeaderEpochFileCache upon log deletion.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2986 from onurkaraman/KAFKA-5099


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a420d20c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a420d20c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a420d20c

Branch: refs/heads/trunk
Commit: a420d20c0e3da2c9637820f7d1a344706fd835fa
Parents: 0eedd6d
Author: Onur Karaman <ok...@linkedin.com>
Authored: Wed May 10 13:45:32 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed May 10 13:45:32 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 10 +++----
 .../server/epoch/LeaderEpochFileCache.scala     | 17 +++++++----
 .../unit/kafka/admin/DeleteTopicTest.scala      |  1 -
 .../src/test/scala/unit/kafka/log/LogTest.scala |  2 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala | 30 ++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala | 19 +++++++++++++
 6 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5722a43..d3ea251 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -162,12 +162,12 @@ class Log(@volatile var dir: File,
     nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset,
       activeSegment.size.toInt)
 
-    leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset)
+    leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
-    leaderEpochCache.clearEarliest(logStartOffset)
+    leaderEpochCache.clearAndFlushEarliest(logStartOffset)
 
     loadProducerState(logEndOffset)
 
@@ -997,7 +997,7 @@ class Log(@volatile var dir: File,
         // remove the segments for lookups
         deletable.foreach(deleteSegment)
         logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
-        leaderEpochCache.clearEarliest(logStartOffset)
+        leaderEpochCache.clearAndFlushEarliest(logStartOffset)
         producerStateManager.evictUnretainedProducers(logStartOffset)
         updateFirstUnstableOffset()
       }
@@ -1282,7 +1282,7 @@ class Log(@volatile var dir: File,
         updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
         this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-        leaderEpochCache.clearLatest(targetOffset)
+        leaderEpochCache.clearAndFlushLatest(targetOffset)
         loadProducerState(targetOffset)
       }
     }
@@ -1308,7 +1308,7 @@ class Log(@volatile var dir: File,
                                 initFileSize = initFileSize,
                                 preallocate = config.preallocate))
       updateLogEndOffset(newOffset)
-      leaderEpochCache.clear()
+      leaderEpochCache.clearAndFlush()
 
       producerStateManager.truncate()
       producerStateManager.updateMapEndOffset(newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 56b1e55..2b1ecc7 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -30,8 +30,9 @@ trait LeaderEpochCache {
   def assign(leaderEpoch: Int, offset: Long)
   def latestEpoch(): Int
   def endOffsetFor(epoch: Int): Long
-  def clearLatest(offset: Long)
-  def clearEarliest(offset: Long)
+  def clearAndFlushLatest(offset: Long)
+  def clearAndFlushEarliest(offset: Long)
+  def clearAndFlush()
   def clear()
 }
 
@@ -111,7 +112,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @param offset
     */
-  override def clearLatest(offset: Long): Unit = {
+  override def clearAndFlushLatest(offset: Long): Unit = {
     inWriteLock(lock) {
       val before = epochs
       if (offset >= 0 && offset <= latestOffset()) {
@@ -130,7 +131,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @param offset the offset to clear up to
     */
-  override def clearEarliest(offset: Long): Unit = {
+  override def clearAndFlushEarliest(offset: Long): Unit = {
     inWriteLock(lock) {
       val before = epochs
       if (offset >= 0 && earliestOffset() < offset) {
@@ -150,13 +151,19 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
   /**
     * Delete all entries.
     */
-  override def clear() = {
+  override def clearAndFlush() = {
     inWriteLock(lock) {
       epochs.clear()
       flush()
     }
   }
 
+  override def clear() = {
+    inWriteLock(lock) {
+      epochs.clear()
+    }
+  }
+
   def epochEntries(): ListBuffer[EpochEntry] = {
     epochs
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index caac222..2085d2d 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -234,7 +234,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.head.setProperty("log.cleaner.enable","true")
     brokerConfigs.head.setProperty("log.cleanup.policy","compact")
     brokerConfigs.head.setProperty("log.segment.bytes","100")
-    brokerConfigs.head.setProperty("log.segment.delete.delay.ms","1000")
     brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
     val servers = createTestTopicAndCluster(topic,brokerConfigs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b11c94b..b4fe9fb 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2035,7 +2035,7 @@ class LogTest {
     assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
     // deliberately remove some of the epoch entries
-    leaderEpochCache.clearLatest(2)
+    leaderEpochCache.clearAndFlushLatest(2)
     assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
     log.close()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index afd1f35..8460fe4 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -401,7 +401,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When clear latest on epoch boundary
-    cache.clearLatest(offset = 8)
+    cache.clearAndFlushLatest(offset = 8)
 
     //Then should remove two latest epochs (remove is inclusive)
     assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
@@ -418,7 +418,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset ON epoch boundary
-    cache.clearEarliest(offset = 8)
+    cache.clearAndFlushEarliest(offset = 8)
 
     //Then should preserve (3, 8)
     assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -435,7 +435,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearEarliest(offset = 9)
+    cache.clearAndFlushEarliest(offset = 9)
 
     //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -452,7 +452,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset before first epoch offset
-    cache.clearEarliest(offset = 1)
+    cache.clearAndFlushEarliest(offset = 1)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -469,7 +469,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset on earliest epoch boundary
-    cache.clearEarliest(offset = 6)
+    cache.clearAndFlushEarliest(offset = 6)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -486,7 +486,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When
-    cache.clearEarliest(offset = 11)
+    cache.clearAndFlushEarliest(offset = 11)
 
     //Then retain the last
     assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
@@ -503,7 +503,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When we clear from a postition between offset 8 & offset 11
-    cache.clearEarliest(offset = 9)
+    cache.clearAndFlushEarliest(offset = 9)
 
     //Then we should update the middle epoch entry's offset
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -520,7 +520,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 2, offset = 10)
 
     //When we clear from a postition between offset 0 & offset 7
-    cache.clearEarliest(offset = 5)
+    cache.clearAndFlushEarliest(offset = 5)
 
     //Then we should keeep epoch 0 but update the offset appropriately
     assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@@ -537,7 +537,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset beyond last epoch
-    cache.clearEarliest(offset = 15)
+    cache.clearAndFlushEarliest(offset = 15)
 
     //Then update the last
     assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
@@ -554,7 +554,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearLatest(offset = 9)
+    cache.clearAndFlushLatest(offset = 9)
 
     //Then should keep the preceding epochs
     assertEquals(3, cache.latestEpoch())
@@ -572,7 +572,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When 
-    cache.clear()
+    cache.clearAndFlush()
 
     //Then 
     assertEquals(0, cache.epochEntries.size)
@@ -589,7 +589,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearLatest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -606,7 +606,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 4, offset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearEarliest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -645,7 +645,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    cache.clearEarliest(7)
+    cache.clearAndFlushEarliest(7)
   }
 
   @Test
@@ -657,7 +657,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    cache.clearLatest(7)
+    cache.clearAndFlushLatest(7)
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5d9e7c1..05d9686 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -217,6 +217,7 @@ object TestUtils extends Logging {
     props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500")
     props.put(KafkaConfig.ControlledShutdownEnableProp, enableControlledShutdown.toString)
     props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
+    props.put(KafkaConfig.LogDeleteDelayMsProp, "1000")
     props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
     props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
@@ -1105,6 +1106,24 @@ object TestUtils extends Logging {
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
     }), "Cleaner offset for deleted partition should have been removed")
+    import scala.collection.JavaConverters._
+    TestUtils.waitUntilTrue(() => servers.forall(server =>
+      server.config.logDirs.forall { logDir =>
+        topicPartitions.forall { tp =>
+          !new File(logDir, tp.topic + "-" + tp.partition).exists()
+        }
+      }
+    ), "Failed to soft-delete the data to a delete directory")
+    TestUtils.waitUntilTrue(() => servers.forall(server =>
+      server.config.logDirs.forall { logDir =>
+        topicPartitions.forall { tp =>
+          !java.util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName =>
+            partitionDirectoryName.startsWith(tp.topic + "-" + tp.partition) &&
+              partitionDirectoryName.endsWith(Log.DeleteDirSuffix)
+          }
+        }
+      }
+    ), "Failed to hard-delete the delete directory")
   }
 
   /**