You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Kyle Ambroff (JIRA)" <ji...@apache.org> on 2017/05/20 16:11:04 UTC

[jira] [Created] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

Kyle Ambroff created KAFKA-5297:
-----------------------------------

             Summary: Broker can take a long time to shut down if there are many active log segments
                 Key: KAFKA-5297
                 URL: https://issues.apache.org/jira/browse/KAFKA-5297
             Project: Kafka
          Issue Type: Improvement
            Reporter: Kyle Ambroff
            Priority: Minor
         Attachments: shutdown-flame-graph.png

After the changes for KIP-33 were merged, we started noticing that our cluster restart times were quite a bit longer. In some cases it was taking four times as long as expected to do a rolling restart of every broker in the cluster. This meant that doing a deploy to one of our Kafka clusters went from taking about 3 hours to more than 12 hours!

We looked into this and we have some data from a couple of runs with a sampling profiler. It turns out that it isn't unusual for us to have a broker sit in kafka.log.Log#close for up to 30 minutes if it has been running for several weeks. There are just so many active log segments that it just takes a long time to truncate all of the indexes.

I've attached a flame graph that was generated from 10 minutes of stack samples collected during shutdown of a broker that took about 30 minutes total to shut down cleanly.

* About 60% of the time was spent in kafka.log.AbstractIndex#resize, where every index and timeindex file is truncated to the size of the number of entries in that index.
* Another big chunk of time is spent reading the last entry from the index, which is used to make any final updates to the timeindex file. This is something that can be cached. For a broker that's been running for a long time the bulk of these indexes are not likely to be in the page cache anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in LogSegment, so we could add a cache for this as well.

Looking at these changes and considering KIP-33, it isn't surprising that the broker shutdown time has increased so dramatically. The extra index plus the extra reads have increased the amount of work performed by kafka.log.Log#close by about 4x (in terms of system calls and potential page faults). Breaking down what this function does:

# Read the max timestamp from the timeindex. Could lead to a disk seek.
# Read the max offset from the index. Could lead to a disk seek.
# Append the timestamp and offset of the most recently written message to the timeindex if it hasn't been written there for some reason.
# Truncate the index file
## Get the position in the index of the last entry written
## If on Windows then unmap and close the index
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() system call.
## Close the newly reopenned and mapped index
# Same thing as #4 but for the timeindex.
## Get the position in the timeindex of the last entry written
## If on Windows then unmap and close the timeindex
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() system call.
## Close the newly reopenned and mapped timeindex
# Finalize the log segment
## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for that log segment.
## Truncate the log segment if it doesn't have enough messages written to fill up the whole thing. Potentially leads to a ftruncate() system call.
## Set the position to the end of the segment after truncation. Leads to a lseek() system call.
## Close and unmap the channel.

Looking in to the current implementation of kafka.log.AbstractIndex#resize, it appears to do quite a bit of extra work to avoid keeping an instance of RandomAccessFile around. It has to reopen the file, truncate, mmap(), potentially perform an additional disk seek, all before imediately closing the file.

You wouldn't think this would amount to much, but I put together a benchmark using jmh to measure the difference between the current code and a new implementation that didn't have to recreate the page mapping during resize(), and the difference is pretty dramatic.

{noformat}
Result "currentImplementation":
  2063.386 ±(99.9%) 81.758 ops/s [Average]
  (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
  CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)

Result "optimizedImplementation":
  3497.354 ±(99.9%) 31.575 ops/s [Average]
  (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
  CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)

# Run complete. Total time: 00:03:37

Benchmark                                     Mode  Cnt     Score    Error  Units
LogSegmentBenchmark.currentImplementation    thrpt   60  2063.386 ± 81.758  ops/s
LogSegmentBenchmark.optimizedImplementation  thrpt   60  3497.354 ± 31.575  ops/s
{noformat}

I ran this benchmark on a Linux workstation. It just measures the throughput of Log#close after 20 segments have been created. Not having to reopen the file amounts to a 70% increase in throughput.

I think there are two totally valid approaches to making this better:

* Premptively truncate index files when log rotation happens. Once a log is rotated, jobs could be added to an ExecutorService which truncates indexes so that they don't all have to be truncated on shutdown. The new shutdown code would enqueue all remaining active indexes and then drain the queue.
* Alternatively we could just add a RandomAccessFile instance variable to AbstractIndex so that it doesn't have to recreate the page mapping on resize(). This means an extra file handle for each segment but that doesn't seem like a big deal to me.

No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)