You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/07 06:25:25 UTC
[2/3] kafka git commit: KAFKA-2929: Migrate duplicate error mapping
functionality
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 949dc02..b239a6c 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -27,6 +27,7 @@ import kafka.message._
import kafka.common.KafkaException
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.network.TransportLayer
/**
@@ -44,9 +45,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
-
+
/* the size of the message set in bytes */
- private val _size =
+ private val _size =
if(isSlice)
new AtomicInteger(end - start) // don't check the file size if this is just a slice view
else
@@ -60,13 +61,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
/**
* Create a file message set with no slicing.
*/
- def this(file: File, channel: FileChannel) =
+ def this(file: File, channel: FileChannel) =
this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
-
+
/**
* Create a file message set with no slicing
*/
- def this(file: File) =
+ def this(file: File) =
this(file, FileMessageSet.openChannel(file, mutable = true))
/**
@@ -86,23 +87,23 @@ class FileMessageSet private[kafka](@volatile var file: File,
* Create a file message set with mutable option
*/
def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
-
+
/**
* Create a slice view of the file message set that begins and ends at the given byte offsets
*/
- def this(file: File, channel: FileChannel, start: Int, end: Int) =
+ def this(file: File, channel: FileChannel, start: Int, end: Int) =
this(file, channel, start, end, isSlice = true)
-
+
/**
* Return a message set which is a view into this set starting from the given position and with the given size limit.
- *
+ *
* If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
- *
+ *
* If this message set is already sliced, the position will be taken relative to that slicing.
- *
+ *
* @param position The start position to begin the read from
* @param size The number of bytes after the start position to include
- *
+ *
* @return A sliced wrapper on this message set limited based on the given position and size
*/
def read(position: Int, size: Int): FileMessageSet = {
@@ -115,7 +116,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
start = this.start + position,
end = math.min(this.start + position + size, sizeInBytes()))
}
-
+
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
@@ -143,7 +144,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
null
}
-
+
/**
* Write some of this set to the given channel.
* @param destChannel The channel to write to.
@@ -168,15 +169,15 @@ class FileMessageSet private[kafka](@volatile var file: File,
+ " bytes requested for transfer : " + math.min(size, sizeInBytes))
bytesTransferred
}
-
+
/**
* Get a shallow iterator over the messages in the set.
*/
override def iterator() = iterator(Int.MaxValue)
-
+
/**
* Get an iterator over the messages in the set. We only do shallow iteration here.
- * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
+ * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
* If we encounter a message larger than this we throw an InvalidMessageException.
* @return The iterator.
*/
@@ -184,44 +185,44 @@ class FileMessageSet private[kafka](@volatile var file: File,
new IteratorTemplate[MessageAndOffset] {
var location = start
val sizeOffsetBuffer = ByteBuffer.allocate(12)
-
+
override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()
-
+
// read the size of the item
sizeOffsetBuffer.rewind()
channel.read(sizeOffsetBuffer, location)
if(sizeOffsetBuffer.hasRemaining)
return allDone()
-
+
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
if(size < Message.MinHeaderSize)
return allDone()
if(size > maxMessageSize)
- throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
-
+ throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
+
// read the item itself
val buffer = ByteBuffer.allocate(size)
channel.read(buffer, location + 12)
if(buffer.hasRemaining)
return allDone()
buffer.rewind()
-
+
// increment the location and return the item
location += size + 12
new MessageAndOffset(new Message(buffer), offset)
}
}
}
-
+
/**
* The number of bytes taken up by this file set
*/
def sizeInBytes(): Int = _size.get()
-
+
/**
* Append these messages to the message set
*/
@@ -229,14 +230,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
_size.getAndAdd(written)
}
-
+
/**
* Commit all written data to the physical disk
*/
def flush() = {
channel.force(true)
}
-
+
/**
* Close this message set
*/
@@ -245,14 +246,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
trim()
channel.close()
}
-
+
/**
* Trim file when close or roll to next file
*/
- def trim() {
+ def trim() {
truncateTo(sizeInBytes())
}
-
+
/**
* Delete this message set from the filesystem
* @return True iff this message set was deleted.
@@ -263,7 +264,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
/**
- * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
+ * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
* given size falls on a valid message boundary.
* @param targetSize The size to truncate to.
* @return The number of bytes truncated off
@@ -278,7 +279,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
_size.set(targetSize)
originalSize - targetSize
}
-
+
/**
* Read from the underlying file into the buffer starting at the given position
*/
@@ -287,7 +288,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
buffer.flip()
buffer
}
-
+
/**
* Rename the file that backs this message set
* @return true iff the rename was successful
@@ -297,9 +298,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
this.file = f
success
}
-
+
}
-
+
object FileMessageSet
{
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 07164f6..32c194d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -27,6 +27,8 @@ import java.io.{IOException, File}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+
import scala.collection.JavaConversions
import com.yammer.metrics.core.Gauge
@@ -50,18 +52,18 @@ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCode
/**
* An append-only log for storing messages.
- *
+ *
* The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
- *
+ *
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
- *
+ *
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
- * @param time The time instance used for checking the clock
- *
+ * @param time The time instance used for checking the clock
+ *
*/
@threadsafe
class Log(val dir: File,
@@ -88,7 +90,7 @@ class Log(val dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
-
+
/* Calculate the offset of the next message */
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
@@ -130,8 +132,8 @@ class Log(val dir: File,
// create the log directory if it doesn't exist
dir.mkdirs()
var swapFiles = Set[File]()
-
- // first do a pass through the files in the log directory and remove any temporary files
+
+ // first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
if(!file.canRead)
@@ -170,9 +172,9 @@ class Log(val dir: File,
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val indexFile = Log.indexFilename(dir, start)
- val segment = new LogSegment(dir = dir,
+ val segment = new LogSegment(dir = dir,
startOffset = start,
- indexIntervalBytes = config.indexInterval,
+ indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
@@ -195,7 +197,7 @@ class Log(val dir: File,
segments.put(start, segment)
}
}
-
+
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
@@ -221,7 +223,7 @@ class Log(val dir: File,
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
- indexIntervalBytes = config.indexInterval,
+ indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
@@ -239,7 +241,7 @@ class Log(val dir: File,
private def updateLogEndOffset(messageOffset: Long) {
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
}
-
+
private def recoverLog() {
// if we have the clean shutdown marker, skip recovery
if(hasCleanShutdownFile) {
@@ -252,11 +254,11 @@ class Log(val dir: File,
while(unflushed.hasNext) {
val curr = unflushed.next
info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
- val truncatedBytes =
+ val truncatedBytes =
try {
curr.recover(config.maxMessageSize)
} catch {
- case e: InvalidOffsetException =>
+ case e: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
@@ -269,7 +271,7 @@ class Log(val dir: File,
}
}
}
-
+
/**
* Check if we have the "clean shutdown" file
*/
@@ -280,7 +282,7 @@ class Log(val dir: File,
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = segments.size
-
+
/**
* Close this log
*/
@@ -294,24 +296,24 @@ class Log(val dir: File,
/**
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
- *
- * This method will generally be responsible for assigning offsets to the messages,
+ *
+ * This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
- *
+ *
* @param messages The message set to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
- *
+ *
* @throws KafkaStorageException If the append fails due to an I/O error.
- *
+ *
* @return Information about the appended messages including the first and last offset.
*/
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
-
+
// if we have any valid messages, append them to the log
if(appendInfo.shallowCount == 0)
return appendInfo
-
+
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages, appendInfo)
@@ -320,36 +322,37 @@ class Log(val dir: File,
lock synchronized {
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
- if(assignOffsets) {
+ if (assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
- validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact)
+ validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config
+ .compact)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
- if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
+ if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
// re-validate message sizes since after re-compression some may exceed the limit
- for(messageAndOffset <- validMessages.shallowIterator) {
- if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+ for (messageAndOffset <- validMessages.shallowIterator) {
+ if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
// we record the original message set size instead of trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+ throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
}
}
// check messages set size may be exceed config.segmentSize
- if(validMessages.sizeInBytes > config.segmentSize) {
- throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
+ if (validMessages.sizeInBytes > config.segmentSize) {
+ throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validMessages.sizeInBytes, config.segmentSize))
}
@@ -363,9 +366,9 @@ class Log(val dir: File,
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
- .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
+ .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
- if(unflushedMessages >= config.flushInterval)
+ if (unflushedMessages >= config.flushInterval)
flush()
appendInfo
@@ -374,14 +377,14 @@ class Log(val dir: File,
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
-
+
/**
* Validate the following:
* <ol>
* <li> each message matches its CRC
* <li> each message size is valid
* </ol>
- *
+ *
* Also compute the following quantities:
* <ol>
* <li> First offset in the message set
@@ -415,7 +418,7 @@ class Log(val dir: File,
if(messageSize > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+ throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(messageSize, config.maxMessageSize))
}
@@ -432,7 +435,7 @@ class Log(val dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-
+
LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
@@ -445,7 +448,7 @@ class Log(val dir: File,
private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
val messageSetValidBytes = info.validBytes
if(messageSetValidBytes < 0)
- throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+ throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
if(messageSetValidBytes == messages.sizeInBytes) {
messages
} else {
@@ -462,7 +465,7 @@ class Log(val dir: File,
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
- *
+ *
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
* @return The fetch data information including fetch starting offset metadata and messages read
*/
@@ -481,7 +484,7 @@ class Log(val dir: File,
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
-
+
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
@@ -510,7 +513,7 @@ class Log(val dir: File,
return fetchInfo
}
}
-
+
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
@@ -537,7 +540,7 @@ class Log(val dir: File,
* @return The number of segments deleted
*/
def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
- // find any segments that match the user-supplied predicate UNLESS it is the final segment
+ // find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
@@ -604,7 +607,7 @@ class Log(val dir: File,
segment
}
}
-
+
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
@@ -620,17 +623,17 @@ class Log(val dir: File,
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
-
+
segments.lastEntry() match {
- case null =>
+ case null =>
case entry => {
entry.getValue.index.trimToValidSize()
entry.getValue.log.trim()
}
}
- val segment = new LogSegment(dir,
+ val segment = new LogSegment(dir,
startOffset = newOffset,
- indexIntervalBytes = config.indexInterval,
+ indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
@@ -645,18 +648,18 @@ class Log(val dir: File,
updateLogEndOffset(nextOffsetMetadata.messageOffset)
// schedule an asynchronous flush of the old segment
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
+
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
segment
}
}
-
+
/**
* The number of messages appended to the log since the last flush
*/
def unflushedMessages() = this.logEndOffset - this.recoveryPoint
-
+
/**
* Flush all log segments
*/
@@ -717,7 +720,7 @@ class Log(val dir: File,
}
}
}
-
+
/**
* Delete all data in the log and start at the new offset
* @param newOffset The new offset to start the log with
@@ -727,9 +730,9 @@ class Log(val dir: File,
lock synchronized {
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment(_))
- addSegment(new LogSegment(dir,
+ addSegment(new LogSegment(dir,
newOffset,
- indexIntervalBytes = config.indexInterval,
+ indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
@@ -745,12 +748,12 @@ class Log(val dir: File,
* The time this log is last known to have been fully flushed to disk
*/
def lastFlushTime(): Long = lastflushedTime.get
-
+
/**
* The active segment that is currently taking appends
*/
def activeSegment = segments.lastEntry.getValue
-
+
/**
* All the log segments in this log ordered from oldest to newest
*/
@@ -758,7 +761,7 @@ class Log(val dir: File,
import JavaConversions._
segments.values
}
-
+
/**
* Get all segments beginning with the segment that includes "from" and ending with the segment
* that includes up to "to-1" or the end of the log (if to > logEndOffset)
@@ -773,9 +776,9 @@ class Log(val dir: File,
segments.subMap(floor, true, to, false).values
}
}
-
+
override def toString() = "Log(" + dir + ")"
-
+
/**
* This method performs an asynchronous log segment delete by doing the following:
* <ol>
@@ -785,7 +788,7 @@ class Log(val dir: File,
* </ol>
* This allows reads to happen concurrently without synchronization and without the possibility of physically
* deleting a file while it is being read from.
- *
+ *
* @param segment The log segment to schedule for deletion
*/
private def deleteSegment(segment: LogSegment) {
@@ -795,10 +798,10 @@ class Log(val dir: File,
asyncDeleteSegment(segment)
}
}
-
+
/**
* Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
- * @throws KafkaStorageException if the file can't be renamed and still exists
+ * @throws KafkaStorageException if the file can't be renamed and still exists
*/
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
@@ -808,11 +811,11 @@ class Log(val dir: File,
}
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
}
-
+
/**
* Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
* be asynchronously deleted.
- *
+ *
* The sequence of operations is:
* <ol>
* <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments().
@@ -828,7 +831,7 @@ class Log(val dir: File,
* If the broker crashes, any .deleted files which may be left behind are deleted
* on recovery in loadSegments().
* </ol>
- *
+ *
* @param newSegment The new log segment to add to the log
* @param oldSegments The old log segments to delete from the log
* @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
@@ -840,7 +843,7 @@ class Log(val dir: File,
if (!isRecoveredSwapFile)
newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
addSegment(newSegment)
-
+
// delete the old files
for(seg <- oldSegments) {
// remove the index entry
@@ -851,7 +854,7 @@ class Log(val dir: File,
}
// okay we are safe now, remove the swap suffix
newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
- }
+ }
}
/**
@@ -868,26 +871,26 @@ class Log(val dir: File,
* @param segment The segment to add
*/
def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
-
+
}
/**
* Helper functions for logs
*/
object Log {
-
+
/** a log file */
val LogFileSuffix = ".log"
-
+
/** an index file */
val IndexFileSuffix = ".index"
-
+
/** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
-
+
/** A temporary file that is being used for log cleaning */
val CleanedFileSuffix = ".cleaned"
-
+
/** A temporary file used when swapping files into the log */
val SwapFileSuffix = ".swap"
@@ -909,23 +912,23 @@ object Log {
nf.setGroupingUsed(false)
nf.format(offset)
}
-
+
/**
* Construct a log file name in the given dir with the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
- def logFilename(dir: File, offset: Long) =
+ def logFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
-
+
/**
* Construct an index file name in the given dir using the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
- def indexFilename(dir: File, offset: Long) =
+ def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
-
+
/**
* Parse the topic and partition out of the directory name of a log
@@ -951,4 +954,4 @@ object Log {
"directory")
}
}
-
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4de4c2b..d604d6c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,6 +20,7 @@ import kafka.message._
import kafka.common._
import kafka.utils._
import kafka.server.{LogOffsetMetadata, FetchDataInfo}
+import org.apache.kafka.common.errors.CorruptRecordException
import scala.math._
import java.io.File
@@ -27,12 +28,12 @@ import java.io.File
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
- * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
+ * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
* any previous segment.
- *
- * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
- *
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
+ *
* @param log The message set containing log entries
* @param index The offset index
* @param baseOffset A lower bound on the offsets in this segment
@@ -40,18 +41,18 @@ import java.io.File
* @param time The time instance
*/
@nonthreadsafe
-class LogSegment(val log: FileMessageSet,
- val index: OffsetIndex,
- val baseOffset: Long,
+class LogSegment(val log: FileMessageSet,
+ val index: OffsetIndex,
+ val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
-
+
var created = time.milliseconds
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
-
+
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
@@ -59,16 +60,16 @@ class LogSegment(val log: FileMessageSet,
indexIntervalBytes,
rollJitterMs,
time)
-
+
/* Return the size in bytes of this log segment */
def size: Long = log.sizeInBytes()
-
+
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
- *
+ *
* It is assumed this method is being called from within a lock.
- *
+ *
* @param offset The first offset in the message set.
* @param messages The messages to append.
*/
@@ -86,17 +87,17 @@ class LogSegment(val log: FileMessageSet,
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
-
+
/**
* Find the physical file position for the first message with offset >= the requested offset.
- *
+ *
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
- *
+ *
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
- *
+ *
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
@@ -108,12 +109,12 @@ class LogSegment(val log: FileMessageSet,
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
- *
+ *
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
* @param maxPosition An optional maximum position in the log segment that should be exposed for read.
- *
+ *
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
@@ -136,7 +137,7 @@ class LogSegment(val log: FileMessageSet,
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
- val length =
+ val length =
maxOffset match {
case None =>
// no max offset, just read until the max position
@@ -146,7 +147,7 @@ class LogSegment(val log: FileMessageSet,
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
- val endPosition =
+ val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
@@ -156,13 +157,13 @@ class LogSegment(val log: FileMessageSet,
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
-
+
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
- *
+ *
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
- *
+ *
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
@@ -191,7 +192,7 @@ class LogSegment(val log: FileMessageSet,
validBytes += MessageSet.entrySize(entry.message)
}
} catch {
- case e: InvalidMessageException =>
+ case e: CorruptRecordException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
@@ -222,7 +223,7 @@ class LogSegment(val log: FileMessageSet,
bytesSinceLastIndexEntry = 0
bytesTruncated
}
-
+
/**
* Calculate the offset that would be used for the next message to be append to this segment.
* Note that this is expensive.
@@ -239,7 +240,7 @@ class LogSegment(val log: FileMessageSet,
}
}
}
-
+
/**
* Flush this log segment to disk
*/
@@ -250,7 +251,7 @@ class LogSegment(val log: FileMessageSet,
index.flush()
}
}
-
+
/**
* Change the suffix for the index and log file for this log segment
*/
@@ -262,7 +263,7 @@ class LogSegment(val log: FileMessageSet,
if(!indexRenamed)
throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
}
-
+
/**
* Close this log segment
*/
@@ -270,7 +271,7 @@ class LogSegment(val log: FileMessageSet,
CoreUtils.swallow(index.close)
CoreUtils.swallow(log.close)
}
-
+
/**
* Delete this log segment from the filesystem.
* @throws KafkaStorageException if the delete fails.
@@ -283,12 +284,12 @@ class LogSegment(val log: FileMessageSet,
if(!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
}
-
+
/**
* The last modified time of this log segment as a unix time stamp
*/
def lastModified = log.file.lastModified
-
+
/**
* Change the last modified time for this log segment
*/
@@ -296,4 +297,4 @@ class LogSegment(val log: FileMessageSet,
log.file.setLastModified(ms)
index.file.setLastModified(ms)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/message/InvalidMessageException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/InvalidMessageException.scala b/core/src/main/scala/kafka/message/InvalidMessageException.scala
index 9f0d6e9..df22516 100644
--- a/core/src/main/scala/kafka/message/InvalidMessageException.scala
+++ b/core/src/main/scala/kafka/message/InvalidMessageException.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -17,9 +17,16 @@
package kafka.message
+import org.apache.kafka.common.errors.CorruptRecordException
+
/**
* Indicates that a message failed its checksum and is corrupt
+ *
+ * InvalidMessageException extends CorruptRecordException for temporary compatibility with the old Scala clients.
+ * We want to update the server side code to use and catch the new CorruptRecordException.
+ * Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
+ * InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.
*/
-class InvalidMessageException(message: String) extends RuntimeException(message) {
+class InvalidMessageException(message: String) extends CorruptRecordException(message) {
def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 13a8aa6..6fa00dd 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -16,11 +16,12 @@
*/
package kafka.producer
+import org.apache.kafka.common.protocol.Errors
+
import collection.mutable.HashMap
import kafka.api.TopicMetadata
import kafka.common.KafkaException
import kafka.utils.Logging
-import kafka.common.ErrorMapping
import kafka.client.ClientUtils
@@ -55,8 +56,8 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
}
val partitionMetadata = metadata.partitionsMetadata
if(partitionMetadata.size == 0) {
- if(metadata.errorCode != ErrorMapping.NoError) {
- throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
+ if(metadata.errorCode != Errors.NONE.code) {
+ throw new KafkaException(Errors.forCode(metadata.errorCode).exception)
} else {
throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
}
@@ -84,20 +85,20 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
- if(tmd.errorCode == ErrorMapping.NoError) {
+ if(tmd.errorCode == Errors.NONE.code) {
topicPartitionInfo.put(tmd.topic, tmd)
} else
- warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
+ warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, Errors.forCode(tmd.errorCode).exception.getClass))
tmd.partitionsMetadata.foreach(pmd =>{
- if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
+ if (pmd.errorCode != Errors.NONE.code && pmd.errorCode == Errors.LEADER_NOT_AVAILABLE.code) {
warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
- ErrorMapping.exceptionFor(pmd.errorCode).getClass))
+ Errors.forCode(pmd.errorCode).exception.getClass))
} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
})
})
producerPool.updateProducer(topicsMetadata)
}
-
+
}
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a6179a9..5ca6ac2 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -22,6 +22,8 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{CoreUtils, Logging, SystemTime}
+import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.protocol.Errors
import scala.util.Random
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
@@ -261,11 +263,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
if (logger.isTraceEnabled) {
- val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+ val successfullySentData = response.status.filter(_._2.error == Errors.NONE.code)
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
}
- val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+ val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
if(failedTopicPartitions.size > 0) {
val errorString = failedPartitionsAndStatus
@@ -273,7 +275,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
(p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
.map{
case(topicAndPartition, status) =>
- topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
+ topicAndPartition.toString + ": " + Errors.forCode(status.error).exception.getClass.getName
}.mkString(",")
warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index bb39722..4e264ca 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -16,7 +16,8 @@
*/
package kafka.security.auth
-import kafka.common.{ErrorMapping, BaseEnum, KafkaException}
+import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.common.protocol.Errors
/**
* ResourceTypes.
@@ -27,17 +28,17 @@ sealed trait ResourceType extends BaseEnum { def errorCode: Short }
case object Cluster extends ResourceType {
val name = "Cluster"
- val errorCode = ErrorMapping.ClusterAuthorizationCode
+ val errorCode = Errors.CLUSTER_AUTHORIZATION_FAILED.code
}
case object Topic extends ResourceType {
val name = "Topic"
- val errorCode = ErrorMapping.TopicAuthorizationCode
+ val errorCode = Errors.TOPIC_AUTHORIZATION_FAILED.code
}
case object Group extends ResourceType {
val name = "Group"
- val errorCode = ErrorMapping.GroupAuthorizationCode
+ val errorCode = Errors.GROUP_AUTHORIZATION_FAILED.code
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index eba2d5a..b3873a6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.BrokerEndPoint
import kafka.consumer.PartitionTopicInfo
-import kafka.message.{InvalidMessageException, MessageAndOffset, ByteBufferMessageSet}
+import kafka.message.{MessageAndOffset, ByteBufferMessageSet}
import kafka.utils.{Pool, ShutdownableThread, DelayedItem}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
+import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
import scala.collection.{mutable, Set, Map}
@@ -137,7 +138,7 @@ abstract class AbstractFetcherThread(name: String,
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
} catch {
- case ime: InvalidMessageException =>
+ case ime: CorruptRecordException =>
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index de6cf5b..c8cb21d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -21,10 +21,9 @@ import java.util.concurrent.TimeUnit
import kafka.api.FetchResponsePartitionData
import kafka.api.PartitionFetchInfo
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.common.NotLeaderForPartitionException
import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
import scala.collection._
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 05078b2..c228807 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,10 +22,10 @@ import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Meter
import kafka.api.ProducerResponseStatus
-import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
import scala.collection._
@@ -58,10 +58,10 @@ class DelayedProduce(delayMs: Long,
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
- if (status.responseStatus.error == ErrorMapping.NoError) {
+ if (status.responseStatus.error == Errors.NONE.code) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
- status.responseStatus.error = ErrorMapping.RequestTimedOutCode
+ status.responseStatus.error = Errors.REQUEST_TIMED_OUT.code
} else {
status.acksPending = false
}
@@ -92,16 +92,16 @@ class DelayedProduce(delayMs: Long,
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
// Case A
- (false, ErrorMapping.UnknownTopicOrPartitionCode)
+ (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
- if (errorCode != ErrorMapping.NoError) {
+ if (errorCode != Errors.NONE.code) {
// Case B.1
status.acksPending = false
status.responseStatus.error = errorCode
} else if (hasEnough) {
// Case B.2
status.acksPending = false
- status.responseStatus.error = ErrorMapping.NoError
+ status.responseStatus.error = Errors.NONE.code
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5fda0eb..018076e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,6 +32,8 @@ import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
+import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
+ClusterAuthorizationException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
@@ -193,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
- ErrorMapping.NoError, partitionsRemaining)
+ Errors.NONE.code, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
}
@@ -206,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// reject the request immediately if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
- val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode)
+ val errors = offsetCommitRequest.requestInfo.mapValues(_ => Errors.GROUP_AUTHORIZATION_FAILED.code)
val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
return
@@ -225,16 +227,16 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
- val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
+ val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
- if (errorCode != ErrorMapping.NoError) {
+ if (errorCode != Errors.NONE.code) {
debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
.format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
- topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
+ topicAndPartition, Errors.forCode(errorCode).exception.getClass.getName))
}
}
- val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
+ val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
@@ -248,16 +250,16 @@ class KafkaApis(val requestChannel: RequestChannel,
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
- (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+ (topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
} else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
- (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+ (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
} else {
zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
topicAndPartition.partition, metaAndError.offset.toString)
- (topicAndPartition, ErrorMapping.NoError)
+ (topicAndPartition, Errors.NONE.code)
}
} catch {
- case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ case e: Throwable => (topicAndPartition, Errors.forException(e).code)
}
}
}
@@ -323,18 +325,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
- val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
+ val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
var errorInResponse = false
mergedResponseStatus.foreach { case (topicAndPartition, status) =>
- if (status.error != ErrorMapping.NoError) {
+ if (status.error != Errors.NONE.code) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
produceRequest.correlationId,
produceRequest.clientId,
topicAndPartition,
- ErrorMapping.exceptionNameFor(status.error)))
+ Errors.forCode(status.error).exception.getClass.getName))
}
}
@@ -346,7 +348,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
- topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
+ topicAndPartition -> Errors.forCode(status.error).exception.getClass.getName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
@@ -406,17 +408,17 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
- val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty))
+ val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
mergedResponseStatus.foreach { case (topicAndPartition, data) =>
- if (data.error != ErrorMapping.NoError) {
+ if (data.error != Errors.NONE.code) {
debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
.format(fetchRequest.correlationId, fetchRequest.clientId,
- topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
+ topicAndPartition, Errors.forCode(data.error).exception.getClass.getName))
}
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
@@ -503,14 +505,14 @@ class KafkaApis(val requestChannel: RequestChannel,
case utpe: UnknownTopicOrPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, utpe.getMessage))
- (topicPartition, new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
case nle: NotLeaderForPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition,nle.getMessage))
- (topicPartition, new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
case e: Throwable =>
error("Error while responding to offset request", e)
- (topicPartition, new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
}
})
@@ -600,15 +602,15 @@ class KafkaApis(val requestChannel: RequestChannel,
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
}
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
} catch {
case e: TopicExistsException => // let it go, possibly another broker created this topic
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
case itex: InvalidTopicException =>
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.InvalidTopicCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.INVALID_TOPIC_EXCEPTION.code)
}
} else {
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
}
topicResponses.appendAll(responsesForNonExistentTopics)
@@ -646,7 +648,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
+ val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.TOPIC_AUTHORIZATION_FAILED.code))
val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
@@ -664,7 +666,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// reject the request immediately if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
- val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode)
+ val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_AUTHORIZATION_FAILED.code)
val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
return
@@ -674,7 +676,7 @@ class KafkaApis(val requestChannel: RequestChannel,
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
- val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode)
+ val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.TOPIC_AUTHORIZATION_FAILED.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
val response = if (offsetFetchRequest.versionId == 0) {
@@ -694,7 +696,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} catch {
case e: Throwable =>
(topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+ Errors.forException(e).code))
}
})
@@ -803,7 +805,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
val responseBody = new JoinGroupResponse(
- ErrorMapping.GroupAuthorizationCode,
+ Errors.GROUP_AUTHORIZATION_FAILED.code,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@@ -838,7 +840,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
- sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode)
+ sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
} else {
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
@@ -863,7 +865,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
- val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode)
+ val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
}
else {
@@ -914,7 +916,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
- val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode)
+ val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
} else {
// let the coordinator to handle leave-group
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 8120167..aaa6ea9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -46,11 +46,11 @@ import scala.collection.JavaConverters._
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.{EndPoint, Broker}
-import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
+import kafka.common.{InconsistentBrokerIdException, GenerateBrokerIdException}
import kafka.network.{BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{GroupConfig, GroupCoordinator}
+import kafka.coordinator.GroupCoordinator
object KafkaServer {
// Copy the subset of properties that are relevant to Logs
@@ -269,13 +269,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
}
val secureAclsEnabled = JaasUtils.isZkSecurityEnabled() && config.zkEnableSecureAcls
-
+
if(config.zkEnableSecureAcls && !secureAclsEnabled) {
- throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")
+ throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")
}
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
- val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
+ val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
secureAclsEnabled)
@@ -465,7 +465,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
response = channel.receive()
val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
- if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
+ if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.size == 0) {
shutdownSucceeded = true
info ("Controlled shutdown succeeded")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a9205f..f47a6aa 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -18,12 +18,12 @@
package kafka.server
import kafka.cluster.{BrokerEndPoint,Broker}
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
import kafka.common.TopicAndPartition
import kafka.api._
import kafka.controller.KafkaController.StateChangeLogger
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import scala.collection.{Seq, Set, mutable}
import kafka.utils.Logging
import kafka.utils.CoreUtils._
@@ -75,12 +75,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
if (isrInfo.size < isr.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
- new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+ new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
} catch {
case e: Throwable =>
debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString))
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ Errors.forException(e).code)
}
}
topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 75e6bae..5b1276e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,6 +29,8 @@ import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
+InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
@@ -44,8 +46,8 @@ import scala.collection.JavaConverters._
*/
case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) {
def errorCode = error match {
- case None => ErrorMapping.NoError
- case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ case None => Errors.NONE.code
+ case Some(e) => Errors.forException(e).code
}
}
@@ -65,8 +67,8 @@ case class LogReadResult(info: FetchDataInfo,
error: Option[Throwable] = None) {
def errorCode = error match {
- case None => ErrorMapping.NoError
- case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ case None => Errors.NONE.code
+ case Some(e) => Errors.forException(e).code
}
override def toString = {
@@ -216,7 +218,7 @@ class ReplicaManager(val config: KafkaConfig,
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
deletePartition.toString, topic, partitionId))
- val errorCode = ErrorMapping.NoError
+ val errorCode = Errors.NONE.code
getPartition(topic, partitionId) match {
case Some(partition) =>
if(deletePartition) {
@@ -248,7 +250,7 @@ class ReplicaManager(val config: KafkaConfig,
if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
.format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
- (responseMap, ErrorMapping.StaleControllerEpochCode)
+ (responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val partitions = stopReplicaRequest.partitions.asScala
controllerEpoch = stopReplicaRequest.controllerEpoch
@@ -258,7 +260,7 @@ class ReplicaManager(val config: KafkaConfig,
val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
responseMap.put(topicPartition, errorCode)
}
- (responseMap, ErrorMapping.NoError)
+ (responseMap, Errors.NONE.code)
}
}
}
@@ -393,7 +395,6 @@ class ReplicaManager(val config: KafkaConfig,
// reject appending to internal topics if it is not allowed
if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) {
-
(topicAndPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)))))
@@ -433,11 +434,11 @@ class ReplicaManager(val config: KafkaConfig,
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
case nle: NotLeaderForPartitionException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
- case mtle: MessageSizeTooLargeException =>
+ case mtle: RecordTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
- case mstle: MessageSetSizeTooLargeException =>
+ case mstle: RecordBatchTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
- case imse : InvalidMessageSizeException =>
+ case imse: CorruptRecordException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
@@ -473,7 +474,7 @@ class ReplicaManager(val config: KafkaConfig,
// check if this fetch request can be satisfied right away
val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) =>
- errorIncurred || (readResult.errorCode != ErrorMapping.NoError))
+ errorIncurred || (readResult.errorCode != Errors.NONE.code))
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
@@ -599,7 +600,7 @@ class ReplicaManager(val config: KafkaConfig,
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
- BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
+ BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
controllerEpoch = leaderAndISRRequest.controllerEpoch
@@ -619,7 +620,7 @@ class ReplicaManager(val config: KafkaConfig,
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
- responseMap.put(topicPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+ responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
// Otherwise record the error code in response
@@ -627,7 +628,7 @@ class ReplicaManager(val config: KafkaConfig,
"epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
- responseMap.put(topicPartition, ErrorMapping.StaleLeaderEpochCode)
+ responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}
@@ -654,7 +655,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
- BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.NoError)
+ BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}
@@ -683,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
for (partition <- partitionState.keys)
- responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
+ responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
@@ -755,7 +756,7 @@ class ReplicaManager(val config: KafkaConfig,
}
for (partition <- partitionState.keys)
- responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
+ responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 8af7614..5c01f34 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -19,12 +19,12 @@ package kafka.tools
import joptsimple._
-import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.consumer.SimpleConsumer
import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
-import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition}
-import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
+import org.apache.kafka.common.errors.BrokerNotAvailableException
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
import scala.collection._
import kafka.client.ClientUtils
@@ -126,7 +126,7 @@ object ConsumerOffsetChecker extends Logging {
parser.accepts("broker-info", "Print broker info")
parser.accepts("help", "Print this message.")
-
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.")
@@ -187,10 +187,10 @@ object ConsumerOffsetChecker extends Logging {
throw z
}
}
- else if (offsetAndMetadata.error == ErrorMapping.NoError)
+ else if (offsetAndMetadata.error == Errors.NONE.code)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else {
- println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+ println("Could not fetch offset for %s due to %s.".format(topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
}
}
channel.disconnect()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index fd9daec..fe4968d 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -27,9 +27,10 @@ import java.util.regex.{PatternSyntaxException, Pattern}
import kafka.api._
import java.text.SimpleDateFormat
import java.util.Date
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.utils._
import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer}
+import org.apache.kafka.common.protocol.Errors
/**
* For verifying the consistency among replicas.
@@ -230,7 +231,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
offsetResponse.partitionErrorAndOffsets.filter {
- case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError
+ case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
}.mkString
}
@@ -397,4 +398,4 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
verificationBarrier.await()
debug("Done verification")
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fccfdb6..a54cbef 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException
import java.util.{ArrayList, Collections, Properties}
import kafka.cluster.EndPoint
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.coordinator.GroupCoordinator
import kafka.integration.KafkaServerTestHarness
import kafka.security.auth._