You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/12 02:53:36 UTC
kafka git commit: KAFKA-3047: Explicit offset assignment in
Log.append can corrupt the log
Repository: kafka
Updated Branches:
refs/heads/trunk a162f6bf6 -> c9311d5f4
KAFKA-3047: Explicit offset assignment in Log.append can corrupt the log
This fix was suggested by Maciek Makowski, who also reported the problem.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1029 from ijuma/KAFKA-3047-log-append-can-corrupt-the-log
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9311d5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9311d5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9311d5f
Branch: refs/heads/trunk
Commit: c9311d5f4ec3b135cb6c0f87008da946863daaa2
Parents: a162f6b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Mar 11 17:53:32 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Mar 11 17:53:32 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9311d5f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index fd176b1..8c956f7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -328,11 +328,11 @@ class Log(val dir: File,
try {
// they are valid, insert them in the log
lock synchronized {
- appendInfo.firstOffset = nextOffsetMetadata.messageOffset
if (assignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
+ appendInfo.firstOffset = offset.value
val now = time.milliseconds
val (validatedMessages, messageSizesMaybeChanged) = try {
validMessages.validateMessagesAndAssignOffsets(offset,