You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jay Kreps (JIRA)" <ji...@apache.org> on 2012/11/26 23:30:58 UTC

[jira] [Created] (KAFKA-631) Implement log compaction

Jay Kreps created KAFKA-631:
-------------------------------

             Summary: Implement log compaction
                 Key: KAFKA-631
                 URL: https://issues.apache.org/jira/browse/KAFKA-631
             Project: Kafka
          Issue Type: New Feature
          Components: core
    Affects Versions: 0.8.1
            Reporter: Jay Kreps


Currently Kafka has only one way to bound the space of the log, namely by deleting old segments. The policy that controls which segments are deleted can be configured based either on the number of bytes to retain or the age of the messages. This makes sense for event or log data which has no notion of primary key. However lots of data has a primary key and consists of updates by primary key. For this data it would be nice to be able to ensure that the log contained at least the last version of every key.

As an example, say that the Kafka topic contains a sequence of User Account messages, each capturing the current state of a given user account. Rather than simply discarding old segments, since the set of user accounts is finite, it might make more sense to delete individual records that have been made obsolete by a more recent update for the same key. This would ensure that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-631) Implement log compaction

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13504192#comment-13504192 ] 

Jay Kreps commented on KAFKA-631:
---------------------------------

Here is a specific proposal:

We will retain the existing settings that retain segments based on bytes and time, with data prior to these limits left unmolested. We will introduce a new setting for each topic "cleanup.policy"={delete, dedupe}. cleanup.policy=delete will correspond to the current behavior. cleanup.policy=dedupe will correspond to the new behavior described in this JIRA. As now, data that falls inside the retention window will not be touched, but data that is outside that window will be deduplicated rather than deleted. It is intended that this be a per-topic setting specified at topic creation time. As a short-cut for the purpose of this ticket I will just add a configuration map setting the policy in the way we have for other topic-level settings, these can all be refactored into something set in the create/alter topic command as a follow-up item.

Topics getting dedupe will be processed by a pool of background "cleaner" threads. These threads will periodically recopy old segment files removing obsolete messages and swapping in these new deduplicated files in place of the old segments. These sparse files should already be well-supported by the logical and sparse offset work in 0.8.

Here are the specific changes intended:
- Add a few new configs: 
   - topic.cleanup.policy={delete,dedupe} // A map of cleanup policies, defaults to delete
   - cleaner.thread.pool.size=# // The number of background threads to use for cleaning 
   - cleaner.buffer.size.bytes=# // The maximum amount of heap memory per cleaner thread that can be used for log deduplication
   - cleaner.max.{read,write}.throughput=# // The maximum bytes per second the cleaner can read or write
- Add a new method Log.replaceSegments() that replaces one or more old segments with a new segment while holding the log lock
- Implement a background cleaner thread that does the recopying. This thread will be owned and maintained by LogManager
- Add a new file per data directory called cleaner-metadata that maintains the cleaned section of the logs in that directory that have dedupe enabled. This allows the cleaner to restart cleaning from the same point upon restart.

The cleaning algorithm for a single log will work as follows:
1. Scan the head of the log (i.e. all messages since the last cleaning) and create a Map of key => offset for messages in the head of the log. If the cleaner buffer is too small to scan the full head of the log then just scan whatever fits going from oldest to newest.
2. Sequentially clean segments from oldest to newest.
3. To clean a segment, first create a new empty copy of the segment file with a temp name. Check each message in the original segment. If it is contained in the map with a higher offset, ignore it; otherwise recopy it to the new temp segment. When the segment is complete swap in the new file and delete the old. 

The threads will iterate over the logs and clean them periodically (not sure the right frequency yet). 

Some Nuances:
1. The above tends to lead to smaller and smaller segment files in the tail of the log as records are overwritten. To avoid this we will combine files; that is, we will always collect the largest set of files that together are smaller than the max segment size into a single segment. Obviously this will be based on the starting sizes, so the resulting segment will likely still be smaller than the resulting segment.
2. The recopying procedure depends on the property that logs are immutable. However our logs are only mostly immutable. It is possible to truncate a log to any segment. It is important that the cleaner respect this and not have a race condition with potential truncate operations. But likewise we can't lock for the duration of the cleaning as it may be quite slow. To work around this I will add a generation counter to the log. Each truncate operation will increment this counter. The cleaner will record the generation before it begins cleaning and the swap operation that swaps in the new, cleaned segment will only occur if the generations match (i.e. if no truncates happened in that segment during cleaning). This will potentially result in some wasted cleaner work when truncatations collide with cleanings, but since truncates are rare and truncates deep enough into the log to interact with cleaning very rare this should almost never happen.
                
> Implement log compaction
> ------------------------
>
>                 Key: KAFKA-631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-631
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>
> Currently Kafka has only one way to bound the space of the log, namely by deleting old segments. The policy that controls which segments are deleted can be configured based either on the number of bytes to retain or the age of the messages. This makes sense for event or log data which has no notion of primary key. However lots of data has a primary key and consists of updates by primary key. For this data it would be nice to be able to ensure that the log contained at least the last version of every key.
> As an example, say that the Kafka topic contains a sequence of User Account messages, each capturing the current state of a given user account. Rather than simply discarding old segments, since the set of user accounts is finite, it might make more sense to delete individual records that have been made obsolete by a more recent update for the same key. This would ensure that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira