You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jay Kreps (JIRA)" <ji...@apache.org> on 2012/10/02 23:35:07 UTC

[jira] [Updated] (KAFKA-506) Store logical offset in log

     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v3.patch

New patch with a few new things:

I rebased a few more times to pick up changes.

WRT Neha's comments:
- I made maxIndexEntries configurable by adding the property log.index.max.size. I did this in terms of index file size rather than entries since the user doesn't really know the entry size but may care about the file size.
- For the failing tests: (1) The message set failure is due to scalatest not handling parameterized tests, i had fixed this but somehow it didn't make it into the previous patch. It is in the current one. testHWCheckpointWithFailuresSingleLogSegment is a timing assumption in that test. Fixed it by adding a sleep :-(. The producer test failure I cannot reproduce.
- Wrote a test case using compressed messages to try to produce the corner case at the end of a segment. But actually this turns out not to be possible with compressed messages since the numbering is by the last offset. So effectively our segments are always dense right now. As such I would rather wait until I refactor segment list to fix it since it will be duplicate work otherwise.
- Turns out that log segments are limited to 2GB already, via a restriction in the config. Not actually sure why this is. Given this limitation one cleanup that might be nice to do would be to convert MessageSet.sizeInBytes to an Int, which would remove a lot of casts. Since this is an unrelated cleanup I will not do it in this patch.
- I added support to DumpLogSegment tool to display the index file. I had to revert Jun's change to check that last offset=file size since this is no longer true.

Jun's Comments:
First of all, this is an impressively thorough code review. Thanks!
20.1 Made the Log.findRange comment more reflective of what the method does. I hope to remove this entirely in the next phase.
20.2 Fixed mangled paren in close()
20.3 bytesSinceLastIndexEntry. Yes, good catch. This is screwed up. This was moved into LogSegment, but the read and update are split in two places. Fixed.
20.4 append(): "We need to have both the begin offset and the end offset returned by Log.append()". Made Log.append return (Long, Long). I am not wild about this change, but I see the need. I had to refactor KafkaApis slightly since we were constructing an intermediate response object in the produceToLocalLog method (which was kind of weird anyway) so there was only one offset and since this is an API object we can't change it. I think the use of API objects in the business logic is a bit dangerous for this reason.
20.5 Fixed broken log statement to use correct format param.
20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. Changed this to use Log.findInRange which I think is the intention.
20.7 "There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?" Good idea, done. There is still a lot more refactoring that could be done between Log and LogSegment, but I am kind of putting that off.
21. LogSegment: "bytesSinceLastIndexEntry needs to be updated in append()." Fixed.
22. FileMessageSet.searchFor() fixed bad byte arithmetic.
23. OffsetIndex:
23.1 Fixed bad english in comment
23.2 mmap initialization: Yes, this doesn't make sense. The correct logic is that the mutable case must be set to index 0, and the read-only case doesn't matter. This was happening implicitly since byte buffers initialize to 0, but I switched it to make it explicit.
23.3 append(): "If index entry is full, should we automatically roll the log segment?" This is already handled in Log.maybeRoll(segment) which checks segment.index.isFull
23.4 makeReadOnly(): "should we call flush after raf.setLength()?" This is a good point. I think
what you are saying is that the truncate call itself needs the metadata to flush to be considered stable. Calling force on the mmap after the setLength won't do this. Instead I changed the file open to use synchronous mode "rws" which should automatically fsync metadata when we call setLength. The existing flush is okay: I verified that flush doesn't cause the sparse file to desparsify or anything like that. "Also, should we remap the index file to the current length and make it read only?" Well, this isn't really needed. There is no problem with truncating a file post mmap, but I guess making the mapping read-only could prevent corruption due to any bugs we might have so I made that change.
LogManager
24. "log indentation already adds LogManager in the prefix of each log entry." Oops.
25. KafkaApis:
25.1 "handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?" Changed to dataRead (I don't like having the type in the name).
25.2 "ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests." Agreed, accidentally removed this; added it back.
26. "Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case." Made all upper case.
27. "javaapi.ByteBufferMessageSet: underlying should be private val." Changed.
28. "DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now." Removed.
29. "FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now." Wait is everything a val in a case class? I made this change, but don't know what it means...
30. PartitionData:
30.1 "No need to redefine equals and hashcode since this is already a case class." Yeah, this was fixing a bug in the equals/hashcode stuff due to the array that went away when i rebased. Removed it
30.2 "initialOffset is no longer needed." I think PartitionData is also used by ProducerRequest. This is a bug, but I think we do need the initial offset for the other case. Until we separate these two I don't think I can remove it.
31. "PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator." Ah, very nice. Changed that.
32. "ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression." These are not real output streams. I can close them, but they are just arrays so I think it is just noise, no?
33. "remove unused imports." Eclipse doesn't identify them, will swing by.
34. "How do we handle the case that a consumer uses too small a fetch size?" Added a check and throw for this in ConsumerIterator.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira