You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2012/10/01 05:56:12 UTC

[jira] [Commented] (KAFKA-506) Store logical offset in log

    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13466626#comment-13466626 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v2. Some more comments:

20. Log:
20.1 findRange(): Add to the comment that now this method returns the largest segment file <= the requested offset.
20.2 close(): move the closing } for the for loop to a new line.
20.3 bytesSinceLastIndexEntry is only set but is never read.
20.4 append(): This method returns the offset of the first message to be appended. This is ok for the purpose of returning the offset to the producer. However, when determining whether all replicas have received the appended messages, we need to use the log end offset after the messages are appended. So, what we should do is to have append() return 2 offsets, one before the append and one after the append. We use the former in producer response and use the latter for the replica check. To avoid complicating this patch further, another approach is to, in the jira, have append return the log end offset after the append and use it in both producer response and replica check. We can file a separate jira to have append return 2 offsets.
20.5 read(): The trace statement: last format pattern should be %d instead of %s.
20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. It should be the offset of the next segment.
          segments.view.find(segment => targetOffset >= segment.start && targetOffset < logEndOffset)
20.7 There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?

21. LogSegment: bytesSinceLastIndexEntry needs to be updated in append().

22. FileMessageSet.searchFor(): The following check seems to be a bit strange. Shouldn't we use position + 12 or just position instead?
    while(position + 8 < size) {

23. OffsetIndex:
23.1 In the comment, "mutable index can be created to" seems to have a grammar bug.
23.2 mmap initialization: The following statement seems unnecessary. However, we do need to set the mapped buffer's position to end of file for mutable indexes. 
          idx.position(idx.limit).asInstanceOf[MappedByteBuffer]
23.3 append(): If index entry is full, should we automatically roll the log segment? It's ok if this is tracked in a separate jira.
23.4 makeReadOnly(): should we call flush after raf.setLength()? Also, should we remap the index file to the current length and make it read only?

24. LogManager.shutdown(): log indentation already adds LogManager in the prefix of each log entry.

25. KafkaApis:
25.1 handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?
25.2 ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests.

26. Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case.

27. javaapi.ByteBufferMessageSet: underlying should be private val.

28. DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now.

29. FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now.

30. PartitionData:
30.1 No need to redefine equals and hashcode since this is already a case class.
30.2 initialOffset is no longer needed.

31. PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator since the offset of a compressed message is always the offset of the last internal message.

32. ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression.

33. remove unused imports.

The following comment from the first round of review is still not addressed.
10. How do we handle the case that a consumer uses too small a fetch size?

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira