You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "M. Manna (JIRA)" <ji...@apache.org> on 2017/07/09 18:08:00 UTC

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

    [ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079678#comment-16079678 ] 

M. Manna commented on KAFKA-1194:
---------------------------------

I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

    def kafkaStorageException(fileType: String, e: IOException) =
      new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e)
	logger.warn("KAFKA mod - starting log renameTo op");
    try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
    catch {
      case e: IOException => throw kafkaStorageException("log", e)
    }
	logger.warn("KAFKA mod - starting index renameTo op")
	index.forceUnmapWrapper
    try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
    catch {
      case e: IOException => throw kafkaStorageException("index", e)
    }
	logger.warn("KAFKA mod - starting timeIndex renameTo op")
	timeIndex.forceUnmapWrapper
    try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
    catch {
      case e: IOException => throw kafkaStorageException("timeindex", e)
    }
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000030240.index.deleted (kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000030240.timeindex.deleted (kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000047520.index.deleted (kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00000000000000047520.timeindex.deleted (kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=300000 - this doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can try this out for future release? 

> The kafka broker cannot delete the old log files after the configured time
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-1194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1194
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.1
>         Environment: window
>            Reporter: Tao Qin
>            Priority: Critical
>              Labels: features, patch
>         Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, kafka-1194-v2.patch, screenshot-1.png, Untitled.jpg
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from  to .deleted for log segment 1516723
>          at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>          at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>          at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>          at scala.collection.immutable.List.foreach(List.scala:76)
>          at kafka.log.Log.deleteOldSegments(Log.scala:418)
>          at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>          at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>          at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>          at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>          at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>          at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>          at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>          at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>          at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>          at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>          at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)