You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/07 06:25:25 UTC

[2/3] kafka git commit: KAFKA-2929: Migrate duplicate error mapping functionality

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 949dc02..b239a6c 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -27,6 +27,7 @@ import kafka.message._
 import kafka.common.KafkaException
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.network.TransportLayer
 
 /**
@@ -44,9 +45,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
                                     private[log] val start: Int,
                                     private[log] val end: Int,
                                     isSlice: Boolean) extends MessageSet with Logging {
-  
+
   /* the size of the message set in bytes */
-  private val _size = 
+  private val _size =
     if(isSlice)
       new AtomicInteger(end - start) // don't check the file size if this is just a slice view
     else
@@ -60,13 +61,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /**
    * Create a file message set with no slicing.
    */
-  def this(file: File, channel: FileChannel) = 
+  def this(file: File, channel: FileChannel) =
     this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
-  
+
   /**
    * Create a file message set with no slicing
    */
-  def this(file: File) = 
+  def this(file: File) =
     this(file, FileMessageSet.openChannel(file, mutable = true))
 
   /**
@@ -86,23 +87,23 @@ class FileMessageSet private[kafka](@volatile var file: File,
    * Create a file message set with mutable option
    */
   def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
-  
+
   /**
    * Create a slice view of the file message set that begins and ends at the given byte offsets
    */
-  def this(file: File, channel: FileChannel, start: Int, end: Int) = 
+  def this(file: File, channel: FileChannel, start: Int, end: Int) =
     this(file, channel, start, end, isSlice = true)
-  
+
   /**
    * Return a message set which is a view into this set starting from the given position and with the given size limit.
-   * 
+   *
    * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
-   * 
+   *
    * If this message set is already sliced, the position will be taken relative to that slicing.
-   * 
+   *
    * @param position The start position to begin the read from
    * @param size The number of bytes after the start position to include
-   * 
+   *
    * @return A sliced wrapper on this message set limited based on the given position and size
    */
   def read(position: Int, size: Int): FileMessageSet = {
@@ -115,7 +116,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
                        start = this.start + position,
                        end = math.min(this.start + position + size, sizeInBytes()))
   }
-  
+
   /**
    * Search forward for the file position of the last offset that is greater than or equal to the target offset
    * and return its physical position. If no such offsets are found, return null.
@@ -143,7 +144,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
     }
     null
   }
-  
+
   /**
    * Write some of this set to the given channel.
    * @param destChannel The channel to write to.
@@ -168,15 +169,15 @@ class FileMessageSet private[kafka](@volatile var file: File,
       + " bytes requested for transfer : " + math.min(size, sizeInBytes))
     bytesTransferred
   }
-  
+
   /**
    * Get a shallow iterator over the messages in the set.
    */
   override def iterator() = iterator(Int.MaxValue)
-    
+
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
-   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. 
+   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
    * If we encounter a message larger than this we throw an InvalidMessageException.
    * @return The iterator.
    */
@@ -184,44 +185,44 @@ class FileMessageSet private[kafka](@volatile var file: File,
     new IteratorTemplate[MessageAndOffset] {
       var location = start
       val sizeOffsetBuffer = ByteBuffer.allocate(12)
-      
+
       override def makeNext(): MessageAndOffset = {
         if(location >= end)
           return allDone()
-          
+
         // read the size of the item
         sizeOffsetBuffer.rewind()
         channel.read(sizeOffsetBuffer, location)
         if(sizeOffsetBuffer.hasRemaining)
           return allDone()
-        
+
         sizeOffsetBuffer.rewind()
         val offset = sizeOffsetBuffer.getLong()
         val size = sizeOffsetBuffer.getInt()
         if(size < Message.MinHeaderSize)
           return allDone()
         if(size > maxMessageSize)
-          throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
-        
+          throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
+
         // read the item itself
         val buffer = ByteBuffer.allocate(size)
         channel.read(buffer, location + 12)
         if(buffer.hasRemaining)
           return allDone()
         buffer.rewind()
-        
+
         // increment the location and return the item
         location += size + 12
         new MessageAndOffset(new Message(buffer), offset)
       }
     }
   }
-  
+
   /**
    * The number of bytes taken up by this file set
    */
   def sizeInBytes(): Int = _size.get()
-  
+
   /**
    * Append these messages to the message set
    */
@@ -229,14 +230,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
     val written = messages.writeTo(channel, 0, messages.sizeInBytes)
     _size.getAndAdd(written)
   }
- 
+
   /**
    * Commit all written data to the physical disk
    */
   def flush() = {
     channel.force(true)
   }
-  
+
   /**
    * Close this message set
    */
@@ -245,14 +246,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
     trim()
     channel.close()
   }
-  
+
   /**
    * Trim file when close or roll to next file
    */
-  def trim() {    
+  def trim() {
     truncateTo(sizeInBytes())
   }
-  
+
   /**
    * Delete this message set from the filesystem
    * @return True iff this message set was deleted.
@@ -263,7 +264,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
-   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the 
+   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
    * given size falls on a valid message boundary.
    * @param targetSize The size to truncate to.
    * @return The number of bytes truncated off
@@ -278,7 +279,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
     _size.set(targetSize)
     originalSize - targetSize
   }
-  
+
   /**
    * Read from the underlying file into the buffer starting at the given position
    */
@@ -287,7 +288,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
     buffer.flip()
     buffer
   }
-  
+
   /**
    * Rename the file that backs this message set
    * @return true iff the rename was successful
@@ -297,9 +298,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
     this.file = f
     success
   }
-  
+
 }
-  
+
 object FileMessageSet
 {
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 07164f6..32c194d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -27,6 +27,8 @@ import java.io.{IOException, File}
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+
 import scala.collection.JavaConversions
 
 import com.yammer.metrics.core.Gauge
@@ -50,18 +52,18 @@ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCode
 
 /**
  * An append-only log for storing messages.
- * 
+ *
  * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
- * 
+ *
  * New log segments are created according to a configurable policy that controls the size in bytes or time interval
  * for a given segment.
- * 
+ *
  * @param dir The directory in which log segments are created.
  * @param config The log configuration settings
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
- * @param time The time instance used for checking the clock 
- * 
+ * @param time The time instance used for checking the clock
+ *
  */
 @threadsafe
 class Log(val dir: File,
@@ -88,7 +90,7 @@ class Log(val dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
   loadSegments()
-  
+
   /* Calculate the offset of the next message */
   @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
 
@@ -130,8 +132,8 @@ class Log(val dir: File,
     // create the log directory if it doesn't exist
     dir.mkdirs()
     var swapFiles = Set[File]()
-    
-    // first do a pass through the files in the log directory and remove any temporary files 
+
+    // first do a pass through the files in the log directory and remove any temporary files
     // and find any interrupted swap operations
     for(file <- dir.listFiles if file.isFile) {
       if(!file.canRead)
@@ -170,9 +172,9 @@ class Log(val dir: File,
         // if its a log file, load the corresponding log segment
         val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
         val indexFile = Log.indexFilename(dir, start)
-        val segment = new LogSegment(dir = dir, 
+        val segment = new LogSegment(dir = dir,
                                      startOffset = start,
-                                     indexIntervalBytes = config.indexInterval, 
+                                     indexIntervalBytes = config.indexInterval,
                                      maxIndexSize = config.maxIndexSize,
                                      rollJitterMs = config.randomSegmentJitter,
                                      time = time,
@@ -195,7 +197,7 @@ class Log(val dir: File,
         segments.put(start, segment)
       }
     }
-    
+
     // Finally, complete any interrupted swap operations. To be crash-safe,
     // log files that are replaced by the swap segment should be renamed to .deleted
     // before the swap file is restored as the new segment file.
@@ -221,7 +223,7 @@ class Log(val dir: File,
       // no existing segments, create a new mutable segment beginning at offset 0
       segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
-                                     indexIntervalBytes = config.indexInterval, 
+                                     indexIntervalBytes = config.indexInterval,
                                      maxIndexSize = config.maxIndexSize,
                                      rollJitterMs = config.randomSegmentJitter,
                                      time = time,
@@ -239,7 +241,7 @@ class Log(val dir: File,
   private def updateLogEndOffset(messageOffset: Long) {
     nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
   }
-  
+
   private def recoverLog() {
     // if we have the clean shutdown marker, skip recovery
     if(hasCleanShutdownFile) {
@@ -252,11 +254,11 @@ class Log(val dir: File,
     while(unflushed.hasNext) {
       val curr = unflushed.next
       info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
-      val truncatedBytes = 
+      val truncatedBytes =
         try {
           curr.recover(config.maxMessageSize)
         } catch {
-          case e: InvalidOffsetException => 
+          case e: InvalidOffsetException =>
             val startOffset = curr.baseOffset
             warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                  "creating an empty one with starting offset " + startOffset)
@@ -269,7 +271,7 @@ class Log(val dir: File,
       }
     }
   }
-  
+
   /**
    * Check if we have the "clean shutdown" file
    */
@@ -280,7 +282,7 @@ class Log(val dir: File,
    * Take care! this is an O(n) operation.
    */
   def numberOfSegments: Int = segments.size
-  
+
   /**
    * Close this log
    */
@@ -294,24 +296,24 @@ class Log(val dir: File,
 
   /**
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
-   * 
-   * This method will generally be responsible for assigning offsets to the messages, 
+   *
+   * This method will generally be responsible for assigning offsets to the messages,
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
-   * 
+   *
    * @param messages The message set to append
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
-   * 
+   *
    * @throws KafkaStorageException If the append fails due to an I/O error.
-   * 
+   *
    * @return Information about the appended messages including the first and last offset.
    */
   def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
     val appendInfo = analyzeAndValidateMessageSet(messages)
-    
+
     // if we have any valid messages, append them to the log
     if(appendInfo.shallowCount == 0)
       return appendInfo
-      
+
     // trim any invalid bytes or partial messages before appending it to the on-disk log
     var validMessages = trimInvalidBytes(messages, appendInfo)
 
@@ -320,36 +322,37 @@ class Log(val dir: File,
       lock synchronized {
         appendInfo.firstOffset = nextOffsetMetadata.messageOffset
 
-        if(assignOffsets) {
+        if (assignOffsets) {
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
           try {
-            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact)
+            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config
+              .compact)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
           appendInfo.lastOffset = offset.get - 1
         } else {
           // we are taking the offsets we are given
-          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
+          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + messages)
         }
 
         // re-validate message sizes since after re-compression some may exceed the limit
-        for(messageAndOffset <- validMessages.shallowIterator) {
-          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+        for (messageAndOffset <- validMessages.shallowIterator) {
+          if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
             // we record the original message set size instead of trimmed size
             // to be consistent with pre-compression bytesRejectedRate recording
             BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
-            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+            throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
               .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
           }
         }
 
         // check messages set size may be exceed config.segmentSize
-        if(validMessages.sizeInBytes > config.segmentSize) {
-          throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
+        if (validMessages.sizeInBytes > config.segmentSize) {
+          throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
             .format(validMessages.sizeInBytes, config.segmentSize))
         }
 
@@ -363,9 +366,9 @@ class Log(val dir: File,
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
         trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
+          .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
 
-        if(unflushedMessages >= config.flushInterval)
+        if (unflushedMessages >= config.flushInterval)
           flush()
 
         appendInfo
@@ -374,14 +377,14 @@ class Log(val dir: File,
       case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
     }
   }
-  
+
   /**
    * Validate the following:
    * <ol>
    * <li> each message matches its CRC
    * <li> each message size is valid
    * </ol>
-   * 
+   *
    * Also compute the following quantities:
    * <ol>
    * <li> First offset in the message set
@@ -415,7 +418,7 @@ class Log(val dir: File,
       if(messageSize > config.maxMessageSize) {
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
-        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+        throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
           .format(messageSize, config.maxMessageSize))
       }
 
@@ -432,7 +435,7 @@ class Log(val dir: File,
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-    
+
     LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
@@ -445,7 +448,7 @@ class Log(val dir: File,
   private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
     val messageSetValidBytes = info.validBytes
     if(messageSetValidBytes < 0)
-      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+      throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
     if(messageSetValidBytes == messages.sizeInBytes) {
       messages
     } else {
@@ -462,7 +465,7 @@ class Log(val dir: File,
    * @param startOffset The offset to begin reading at
    * @param maxLength The maximum number of bytes to read
    * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
-   * 
+   *
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
    * @return The fetch data information including fetch starting offset metadata and messages read
    */
@@ -481,7 +484,7 @@ class Log(val dir: File,
     // attempt to read beyond the log end offset is an error
     if(startOffset > next || entry == null)
       throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
-    
+
     // Do the read on the segment with a base offset less than the target offset
     // but if that segment doesn't contain any messages with an offset greater than that
     // continue to read from successive segments until we get some messages or we reach the end of the log
@@ -510,7 +513,7 @@ class Log(val dir: File,
         return fetchInfo
       }
     }
-    
+
     // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
     // this can happen when all messages with offset larger than start offsets have been deleted.
     // In this case, we will return the empty set with log end offset metadata
@@ -537,7 +540,7 @@ class Log(val dir: File,
    * @return The number of segments deleted
    */
   def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
-    // find any segments that match the user-supplied predicate UNLESS it is the final segment 
+    // find any segments that match the user-supplied predicate UNLESS it is the final segment
     // and it is empty (since we would just end up re-creating it
     val lastSegment = activeSegment
     val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
@@ -604,7 +607,7 @@ class Log(val dir: File,
       segment
     }
   }
-  
+
   /**
    * Roll the log over to a new active segment starting with the current logEndOffset.
    * This will trim the index to the exact size of the number of entries it currently contains.
@@ -620,17 +623,17 @@ class Log(val dir: File,
         warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
         file.delete()
       }
-    
+
       segments.lastEntry() match {
-        case null => 
+        case null =>
         case entry => {
           entry.getValue.index.trimToValidSize()
           entry.getValue.log.trim()
         }
       }
-      val segment = new LogSegment(dir, 
+      val segment = new LogSegment(dir,
                                    startOffset = newOffset,
-                                   indexIntervalBytes = config.indexInterval, 
+                                   indexIntervalBytes = config.indexInterval,
                                    maxIndexSize = config.maxIndexSize,
                                    rollJitterMs = config.randomSegmentJitter,
                                    time = time,
@@ -645,18 +648,18 @@ class Log(val dir: File,
       updateLogEndOffset(nextOffsetMetadata.messageOffset)
       // schedule an asynchronous flush of the old segment
       scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-      
+
       info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
 
       segment
     }
   }
-  
+
   /**
    * The number of messages appended to the log since the last flush
    */
   def unflushedMessages() = this.logEndOffset - this.recoveryPoint
-  
+
   /**
    * Flush all log segments
    */
@@ -717,7 +720,7 @@ class Log(val dir: File,
       }
     }
   }
-    
+
   /**
    *  Delete all data in the log and start at the new offset
    *  @param newOffset The new offset to start the log with
@@ -727,9 +730,9 @@ class Log(val dir: File,
     lock synchronized {
       val segmentsToDelete = logSegments.toList
       segmentsToDelete.foreach(deleteSegment(_))
-      addSegment(new LogSegment(dir, 
+      addSegment(new LogSegment(dir,
                                 newOffset,
-                                indexIntervalBytes = config.indexInterval, 
+                                indexIntervalBytes = config.indexInterval,
                                 maxIndexSize = config.maxIndexSize,
                                 rollJitterMs = config.randomSegmentJitter,
                                 time = time,
@@ -745,12 +748,12 @@ class Log(val dir: File,
    * The time this log is last known to have been fully flushed to disk
    */
   def lastFlushTime(): Long = lastflushedTime.get
-  
+
   /**
    * The active segment that is currently taking appends
    */
   def activeSegment = segments.lastEntry.getValue
-  
+
   /**
    * All the log segments in this log ordered from oldest to newest
    */
@@ -758,7 +761,7 @@ class Log(val dir: File,
     import JavaConversions._
     segments.values
   }
-  
+
   /**
    * Get all segments beginning with the segment that includes "from" and ending with the segment
    * that includes up to "to-1" or the end of the log (if to > logEndOffset)
@@ -773,9 +776,9 @@ class Log(val dir: File,
         segments.subMap(floor, true, to, false).values
     }
   }
-  
+
   override def toString() = "Log(" + dir + ")"
-  
+
   /**
    * This method performs an asynchronous log segment delete by doing the following:
    * <ol>
@@ -785,7 +788,7 @@ class Log(val dir: File,
    * </ol>
    * This allows reads to happen concurrently without synchronization and without the possibility of physically
    * deleting a file while it is being read from.
-   * 
+   *
    * @param segment The log segment to schedule for deletion
    */
   private def deleteSegment(segment: LogSegment) {
@@ -795,10 +798,10 @@ class Log(val dir: File,
       asyncDeleteSegment(segment)
     }
   }
-  
+
   /**
    * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
-   * @throws KafkaStorageException if the file can't be renamed and still exists 
+   * @throws KafkaStorageException if the file can't be renamed and still exists
    */
   private def asyncDeleteSegment(segment: LogSegment) {
     segment.changeFileSuffixes("", Log.DeletedFileSuffix)
@@ -808,11 +811,11 @@ class Log(val dir: File,
     }
     scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
   }
-  
+
   /**
    * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
    * be asynchronously deleted.
-   * 
+   *
    * The sequence of operations is:
    * <ol>
    *   <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments().
@@ -828,7 +831,7 @@ class Log(val dir: File,
    *        If the broker crashes, any .deleted files which may be left behind are deleted
    *        on recovery in loadSegments().
    * </ol>
-   * 
+   *
    * @param newSegment The new log segment to add to the log
    * @param oldSegments The old log segments to delete from the log
    * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
@@ -840,7 +843,7 @@ class Log(val dir: File,
       if (!isRecoveredSwapFile)
         newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
       addSegment(newSegment)
-        
+
       // delete the old files
       for(seg <- oldSegments) {
         // remove the index entry
@@ -851,7 +854,7 @@ class Log(val dir: File,
       }
       // okay we are safe now, remove the swap suffix
       newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
-    }  
+    }
   }
 
   /**
@@ -868,26 +871,26 @@ class Log(val dir: File,
    * @param segment The segment to add
    */
   def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
-  
+
 }
 
 /**
  * Helper functions for logs
  */
 object Log {
-  
+
   /** a log file */
   val LogFileSuffix = ".log"
-    
+
   /** an index file */
   val IndexFileSuffix = ".index"
-    
+
   /** a file that is scheduled to be deleted */
   val DeletedFileSuffix = ".deleted"
-    
+
   /** A temporary file that is being used for log cleaning */
   val CleanedFileSuffix = ".cleaned"
-    
+
   /** A temporary file used when swapping files into the log */
   val SwapFileSuffix = ".swap"
 
@@ -909,23 +912,23 @@ object Log {
     nf.setGroupingUsed(false)
     nf.format(offset)
   }
-  
+
   /**
    * Construct a log file name in the given dir with the given base offset
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def logFilename(dir: File, offset: Long) = 
+  def logFilename(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
-  
+
   /**
    * Construct an index file name in the given dir using the given base offset
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def indexFilename(dir: File, offset: Long) = 
+  def indexFilename(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
-  
+
 
   /**
    * Parse the topic and partition out of the directory name of a log
@@ -951,4 +954,4 @@ object Log {
       "directory")
   }
 }
-  
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4de4c2b..d604d6c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,6 +20,7 @@ import kafka.message._
 import kafka.common._
 import kafka.utils._
 import kafka.server.{LogOffsetMetadata, FetchDataInfo}
+import org.apache.kafka.common.errors.CorruptRecordException
 
 import scala.math._
 import java.io.File
@@ -27,12 +28,12 @@ import java.io.File
 
  /**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
- * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
+ * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
  * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
  * any previous segment.
- * 
- * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. 
- * 
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
+ *
  * @param log The message set containing log entries
  * @param index The offset index
  * @param baseOffset A lower bound on the offsets in this segment
@@ -40,18 +41,18 @@ import java.io.File
  * @param time The time instance
  */
 @nonthreadsafe
-class LogSegment(val log: FileMessageSet, 
-                 val index: OffsetIndex, 
-                 val baseOffset: Long, 
+class LogSegment(val log: FileMessageSet,
+                 val index: OffsetIndex,
+                 val baseOffset: Long,
                  val indexIntervalBytes: Int,
                  val rollJitterMs: Long,
                  time: Time) extends Logging {
-  
+
   var created = time.milliseconds
 
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
-  
+
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
          new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
@@ -59,16 +60,16 @@ class LogSegment(val log: FileMessageSet,
          indexIntervalBytes,
          rollJitterMs,
          time)
-    
+
   /* Return the size in bytes of this log segment */
   def size: Long = log.sizeInBytes()
-  
+
   /**
    * Append the given messages starting with the given offset. Add
    * an entry to the index if needed.
-   * 
+   *
    * It is assumed this method is being called from within a lock.
-   * 
+   *
    * @param offset The first offset in the message set.
    * @param messages The messages to append.
    */
@@ -86,17 +87,17 @@ class LogSegment(val log: FileMessageSet,
       this.bytesSinceLastIndexEntry += messages.sizeInBytes
     }
   }
-  
+
   /**
    * Find the physical file position for the first message with offset >= the requested offset.
-   * 
+   *
    * The lowerBound argument is an optimization that can be used if we already know a valid starting position
    * in the file higher than the greatest-lower-bound from the index.
-   * 
+   *
    * @param offset The offset we want to translate
    * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
    * when omitted, the search will begin at the position in the offset index.
-   * 
+   *
    * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
    */
   @threadsafe
@@ -108,12 +109,12 @@ class LogSegment(val log: FileMessageSet,
   /**
    * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
    * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
-   * 
+   *
    * @param startOffset A lower bound on the first offset to include in the message set we read
    * @param maxSize The maximum number of bytes to include in the message set we read
    * @param maxOffset An optional maximum offset for the message set we read
    * @param maxPosition An optional maximum position in the log segment that should be exposed for read.
-   * 
+   *
    * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
    *         or null if the startOffset is larger than the largest offset in this log
    */
@@ -136,7 +137,7 @@ class LogSegment(val log: FileMessageSet,
       return FetchDataInfo(offsetMetadata, MessageSet.Empty)
 
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
-    val length = 
+    val length =
       maxOffset match {
         case None =>
           // no max offset, just read until the max position
@@ -146,7 +147,7 @@ class LogSegment(val log: FileMessageSet,
           if(offset < startOffset)
             throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
           val mapping = translateOffset(offset, startPosition.position)
-          val endPosition = 
+          val endPosition =
             if(mapping == null)
               logSize // the max offset is off the end of the log, use the end of the file
             else
@@ -156,13 +157,13 @@ class LogSegment(val log: FileMessageSet,
       }
     FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
   }
-  
+
   /**
    * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
-   * 
+   *
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
-   * 
+   *
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
@@ -191,7 +192,7 @@ class LogSegment(val log: FileMessageSet,
         validBytes += MessageSet.entrySize(entry.message)
       }
     } catch {
-      case e: InvalidMessageException => 
+      case e: CorruptRecordException =>
         logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
     }
     val truncated = log.sizeInBytes - validBytes
@@ -222,7 +223,7 @@ class LogSegment(val log: FileMessageSet,
     bytesSinceLastIndexEntry = 0
     bytesTruncated
   }
-  
+
   /**
    * Calculate the offset that would be used for the next message to be append to this segment.
    * Note that this is expensive.
@@ -239,7 +240,7 @@ class LogSegment(val log: FileMessageSet,
       }
     }
   }
-  
+
   /**
    * Flush this log segment to disk
    */
@@ -250,7 +251,7 @@ class LogSegment(val log: FileMessageSet,
       index.flush()
     }
   }
-  
+
   /**
    * Change the suffix for the index and log file for this log segment
    */
@@ -262,7 +263,7 @@ class LogSegment(val log: FileMessageSet,
     if(!indexRenamed)
       throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
   }
-  
+
   /**
    * Close this log segment
    */
@@ -270,7 +271,7 @@ class LogSegment(val log: FileMessageSet,
     CoreUtils.swallow(index.close)
     CoreUtils.swallow(log.close)
   }
-  
+
   /**
    * Delete this log segment from the filesystem.
    * @throws KafkaStorageException if the delete fails.
@@ -283,12 +284,12 @@ class LogSegment(val log: FileMessageSet,
     if(!deletedIndex && index.file.exists)
       throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
   }
-  
+
   /**
    * The last modified time of this log segment as a unix time stamp
    */
   def lastModified = log.file.lastModified
-  
+
   /**
    * Change the last modified time for this log segment
    */
@@ -296,4 +297,4 @@ class LogSegment(val log: FileMessageSet,
     log.file.setLastModified(ms)
     index.file.setLastModified(ms)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/message/InvalidMessageException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/InvalidMessageException.scala b/core/src/main/scala/kafka/message/InvalidMessageException.scala
index 9f0d6e9..df22516 100644
--- a/core/src/main/scala/kafka/message/InvalidMessageException.scala
+++ b/core/src/main/scala/kafka/message/InvalidMessageException.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,9 +17,16 @@
 
 package kafka.message
 
+import org.apache.kafka.common.errors.CorruptRecordException
+
 /**
  * Indicates that a message failed its checksum and is corrupt
+ *
+ * InvalidMessageException extends CorruptRecordException for temporary compatibility with the old Scala clients.
+ * We want to update the server side code to use and catch the new CorruptRecordException.
+ * Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
+ * InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.
  */
-class InvalidMessageException(message: String) extends RuntimeException(message) {
+class InvalidMessageException(message: String) extends CorruptRecordException(message) {
   def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 13a8aa6..6fa00dd 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -16,11 +16,12 @@
 */
 package kafka.producer
 
+import org.apache.kafka.common.protocol.Errors
+
 import collection.mutable.HashMap
 import kafka.api.TopicMetadata
 import kafka.common.KafkaException
 import kafka.utils.Logging
-import kafka.common.ErrorMapping
 import kafka.client.ClientUtils
 
 
@@ -55,8 +56,8 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       }
     val partitionMetadata = metadata.partitionsMetadata
     if(partitionMetadata.size == 0) {
-      if(metadata.errorCode != ErrorMapping.NoError) {
-        throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
+      if(metadata.errorCode != Errors.NONE.code) {
+        throw new KafkaException(Errors.forCode(metadata.errorCode).exception)
       } else {
         throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
       }
@@ -84,20 +85,20 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
       trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.errorCode == ErrorMapping.NoError) {
+      if(tmd.errorCode == Errors.NONE.code) {
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
+        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, Errors.forCode(tmd.errorCode).exception.getClass))
       tmd.partitionsMetadata.foreach(pmd =>{
-        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
+        if (pmd.errorCode != Errors.NONE.code && pmd.errorCode == Errors.LEADER_NOT_AVAILABLE.code) {
           warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
-            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
+            Errors.forCode(pmd.errorCode).exception.getClass))
         } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
       })
     })
     producerPool.updateProducer(topicsMetadata)
   }
-  
+
 }
 
 case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a6179a9..5ca6ac2 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -22,6 +22,8 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
 import kafka.utils.{CoreUtils, Logging, SystemTime}
+import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.protocol.Errors
 import scala.util.Random
 import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
@@ -261,11 +263,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           if (response.status.size != producerRequest.data.size)
             throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
           if (logger.isTraceEnabled) {
-            val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+            val successfullySentData = response.status.filter(_._2.error == Errors.NONE.code)
             successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
               trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
           }
-          val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+          val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq
           failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
           if(failedTopicPartitions.size > 0) {
             val errorString = failedPartitionsAndStatus
@@ -273,7 +275,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                     (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
               .map{
                 case(topicAndPartition, status) =>
-                  topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
+                  topicAndPartition.toString + ": " + Errors.forCode(status.error).exception.getClass.getName
               }.mkString(",")
             warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index bb39722..4e264ca 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -16,7 +16,8 @@
  */
 package kafka.security.auth
 
-import kafka.common.{ErrorMapping, BaseEnum, KafkaException}
+import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.common.protocol.Errors
 
 /**
  * ResourceTypes.
@@ -27,17 +28,17 @@ sealed trait ResourceType extends BaseEnum { def errorCode: Short }
 
 case object Cluster extends ResourceType {
   val name = "Cluster"
-  val errorCode = ErrorMapping.ClusterAuthorizationCode
+  val errorCode = Errors.CLUSTER_AUTHORIZATION_FAILED.code
 }
 
 case object Topic extends ResourceType {
   val name = "Topic"
-  val errorCode = ErrorMapping.TopicAuthorizationCode
+  val errorCode = Errors.TOPIC_AUTHORIZATION_FAILED.code
 }
 
 case object Group extends ResourceType {
   val name = "Group"
-  val errorCode = ErrorMapping.GroupAuthorizationCode
+  val errorCode = Errors.GROUP_AUTHORIZATION_FAILED.code
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index eba2d5a..b3873a6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.BrokerEndPoint
 import kafka.consumer.PartitionTopicInfo
-import kafka.message.{InvalidMessageException, MessageAndOffset, ByteBufferMessageSet}
+import kafka.message.{MessageAndOffset, ByteBufferMessageSet}
 import kafka.utils.{Pool, ShutdownableThread, DelayedItem}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
+import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
 import scala.collection.{mutable, Set, Map}
@@ -137,7 +138,7 @@ abstract class AbstractFetcherThread(name: String,
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
                   } catch {
-                    case ime: InvalidMessageException =>
+                    case ime: CorruptRecordException =>
                       // we log the error and continue. This ensures two things
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index de6cf5b..c8cb21d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -21,10 +21,9 @@ import java.util.concurrent.TimeUnit
 
 import kafka.api.FetchResponsePartitionData
 import kafka.api.PartitionFetchInfo
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.common.NotLeaderForPartitionException
 import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
 
 import scala.collection._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 05078b2..c228807 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,10 +22,10 @@ import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Meter
 import kafka.api.ProducerResponseStatus
-import kafka.common.ErrorMapping
 import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
 
@@ -58,10 +58,10 @@ class DelayedProduce(delayMs: Long,
 
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
-    if (status.responseStatus.error == ErrorMapping.NoError) {
+    if (status.responseStatus.error == Errors.NONE.code) {
       // Timeout error state will be cleared when required acks are received
       status.acksPending = true
-      status.responseStatus.error = ErrorMapping.RequestTimedOutCode
+      status.responseStatus.error = Errors.REQUEST_TIMED_OUT.code
     } else {
       status.acksPending = false
     }
@@ -92,16 +92,16 @@ class DelayedProduce(delayMs: Long,
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>
             // Case A
-            (false, ErrorMapping.UnknownTopicOrPartitionCode)
+            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
         }
-        if (errorCode != ErrorMapping.NoError) {
+        if (errorCode != Errors.NONE.code) {
           // Case B.1
           status.acksPending = false
           status.responseStatus.error = errorCode
         } else if (hasEnough) {
           // Case B.2
           status.acksPending = false
-          status.responseStatus.error = ErrorMapping.NoError
+          status.responseStatus.error = Errors.NONE.code
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5fda0eb..018076e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,6 +32,8 @@ import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
+import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
+ClusterAuthorizationException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
@@ -193,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
     val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
-      ErrorMapping.NoError, partitionsRemaining)
+      Errors.NONE.code, partitionsRemaining)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
   }
 
@@ -206,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // reject the request immediately if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
-      val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode)
+      val errors = offsetCommitRequest.requestInfo.mapValues(_ => Errors.GROUP_AUTHORIZATION_FAILED.code)
       val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
       return
@@ -225,16 +227,16 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending an offset commit response
     def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
-      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
+      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
 
       mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
-        if (errorCode != ErrorMapping.NoError) {
+        if (errorCode != Errors.NONE.code) {
           debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
             .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
-              topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
+              topicAndPartition, Errors.forCode(errorCode).exception.getClass.getName))
         }
       }
-      val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
+      val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
       val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
     }
@@ -248,16 +250,16 @@ class KafkaApis(val requestChannel: RequestChannel,
           val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
           try {
             if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
-              (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+              (topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
             } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
-              (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+              (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
             } else {
               zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
                 topicAndPartition.partition, metaAndError.offset.toString)
-              (topicAndPartition, ErrorMapping.NoError)
+              (topicAndPartition, Errors.NONE.code)
             }
           } catch {
-            case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            case e: Throwable => (topicAndPartition, Errors.forException(e).code)
           }
         }
       }
@@ -323,18 +325,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
 
-      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
+      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
 
       var errorInResponse = false
 
       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
-        if (status.error != ErrorMapping.NoError) {
+        if (status.error != Errors.NONE.code) {
           errorInResponse = true
           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
             produceRequest.correlationId,
             produceRequest.clientId,
             topicAndPartition,
-            ErrorMapping.exceptionNameFor(status.error)))
+            Errors.forCode(status.error).exception.getClass.getName))
         }
       }
 
@@ -346,7 +348,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           // the producer client will know that some error has happened and will refresh its metadata
           if (errorInResponse) {
             val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
-              topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
+              topicAndPartition -> Errors.forCode(status.error).exception.getClass.getName
             }.mkString(", ")
             info(
               s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
@@ -406,17 +408,17 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty))
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
       val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
 
       mergedResponseStatus.foreach { case (topicAndPartition, data) =>
-        if (data.error != ErrorMapping.NoError) {
+        if (data.error != Errors.NONE.code) {
           debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
             .format(fetchRequest.correlationId, fetchRequest.clientId,
-            topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
+            topicAndPartition, Errors.forCode(data.error).exception.getClass.getName))
         }
         // record the bytes out metrics only when the response is being sent
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
@@ -503,14 +505,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         case utpe: UnknownTopicOrPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                correlationId, clientId, topicPartition, utpe.getMessage))
-          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
         case nle: NotLeaderForPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                correlationId, clientId, topicPartition,nle.getMessage))
-          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
         case e: Throwable =>
           error("Error while responding to offset request", e)
-          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), List[JLong]().asJava))
+          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
       }
     })
 
@@ -600,15 +602,15 @@ class KafkaApis(val requestChannel: RequestChannel,
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                    .format(topic, config.numPartitions, config.defaultReplicationFactor))
             }
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+            new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
           } catch {
             case e: TopicExistsException => // let it go, possibly another broker created this topic
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
             case itex: InvalidTopicException =>
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.InvalidTopicCode)
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.INVALID_TOPIC_EXCEPTION.code)
           }
         } else {
-          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
         }
       }
       topicResponses.appendAll(responsesForNonExistentTopics)
@@ -646,7 +648,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
+    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.TOPIC_AUTHORIZATION_FAILED.code))
 
     val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
@@ -664,7 +666,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // reject the request immediately if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
-      val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode)
+      val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_AUTHORIZATION_FAILED.code)
       val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
       return
@@ -674,7 +676,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode)
+    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.TOPIC_AUTHORIZATION_FAILED.code)
     val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
 
     val response = if (offsetFetchRequest.versionId == 0) {
@@ -694,7 +696,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } catch {
           case e: Throwable =>
             (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+              Errors.forException(e).code))
         }
       })
 
@@ -803,7 +805,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
       val responseBody = new JoinGroupResponse(
-        ErrorMapping.GroupAuthorizationCode,
+        Errors.GROUP_AUTHORIZATION_FAILED.code,
         JoinGroupResponse.UNKNOWN_GENERATION_ID,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@@ -838,7 +840,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
-      sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode)
+      sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
     } else {
       coordinator.handleSyncGroup(
         syncGroupRequest.groupId(),
@@ -863,7 +865,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode)
+      val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
       requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
     }
     else {
@@ -914,7 +916,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode)
+      val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
       requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
     } else {
       // let the coordinator to handle leave-group

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 8120167..aaa6ea9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -46,11 +46,11 @@ import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
-import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
+import kafka.common.{InconsistentBrokerIdException, GenerateBrokerIdException}
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{GroupConfig, GroupCoordinator}
+import kafka.coordinator.GroupCoordinator
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -269,13 +269,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     }
 
     val secureAclsEnabled = JaasUtils.isZkSecurityEnabled() && config.zkEnableSecureAcls
-    
+
     if(config.zkEnableSecureAcls && !secureAclsEnabled) {
-      throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")    
+      throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")
     }
     if (chroot.length > 1) {
       val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
-      val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, 
+      val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
                                               config.zkSessionTimeoutMs,
                                               config.zkConnectionTimeoutMs,
                                               secureAclsEnabled)
@@ -465,7 +465,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
               response = channel.receive()
               val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
-              if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
+              if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null &&
                 shutdownResponse.partitionsRemaining.size == 0) {
                 shutdownSucceeded = true
                 info ("Controlled shutdown succeeded")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a9205f..f47a6aa 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -18,12 +18,12 @@
 package kafka.server
 
 import kafka.cluster.{BrokerEndPoint,Broker}
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
 import kafka.common.TopicAndPartition
 
 import kafka.api._
 import kafka.controller.KafkaController.StateChangeLogger
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import scala.collection.{Seq, Set, mutable}
 import kafka.utils.Logging
 import kafka.utils.CoreUtils._
@@ -75,12 +75,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
                 if (isrInfo.size < isr.size)
                   throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
                     isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
               } catch {
                 case e: Throwable =>
                   debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString))
                   new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
-                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+                    Errors.forException(e).code)
               }
           }
           topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 75e6bae..5b1276e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,6 +29,8 @@ import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
+InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -44,8 +46,8 @@ import scala.collection.JavaConverters._
  */
 case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) {
   def errorCode = error match {
-    case None => ErrorMapping.NoError
-    case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    case None => Errors.NONE.code
+    case Some(e) => Errors.forException(e).code
   }
 }
 
@@ -65,8 +67,8 @@ case class LogReadResult(info: FetchDataInfo,
                          error: Option[Throwable] = None) {
 
   def errorCode = error match {
-    case None => ErrorMapping.NoError
-    case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    case None => Errors.NONE.code
+    case Some(e) => Errors.forException(e).code
   }
 
   override def toString = {
@@ -216,7 +218,7 @@ class ReplicaManager(val config: KafkaConfig,
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
     stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
       deletePartition.toString, topic, partitionId))
-    val errorCode = ErrorMapping.NoError
+    val errorCode = Errors.NONE.code
     getPartition(topic, partitionId) match {
       case Some(partition) =>
         if(deletePartition) {
@@ -248,7 +250,7 @@ class ReplicaManager(val config: KafkaConfig,
       if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
         stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
           .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
-        (responseMap, ErrorMapping.StaleControllerEpochCode)
+        (responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
       } else {
         val partitions = stopReplicaRequest.partitions.asScala
         controllerEpoch = stopReplicaRequest.controllerEpoch
@@ -258,7 +260,7 @@ class ReplicaManager(val config: KafkaConfig,
           val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
           responseMap.put(topicPartition, errorCode)
         }
-        (responseMap, ErrorMapping.NoError)
+        (responseMap, Errors.NONE.code)
       }
     }
   }
@@ -393,7 +395,6 @@ class ReplicaManager(val config: KafkaConfig,
 
       // reject appending to internal topics if it is not allowed
       if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) {
-
         (topicAndPartition, LogAppendResult(
           LogAppendInfo.UnknownLogAppendInfo,
           Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)))))
@@ -433,11 +434,11 @@ class ReplicaManager(val config: KafkaConfig,
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
           case nle: NotLeaderForPartitionException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
-          case mtle: MessageSizeTooLargeException =>
+          case mtle: RecordTooLargeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
-          case mstle: MessageSetSizeTooLargeException =>
+          case mstle: RecordBatchTooLargeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
-          case imse : InvalidMessageSizeException =>
+          case imse: CorruptRecordException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
@@ -473,7 +474,7 @@ class ReplicaManager(val config: KafkaConfig,
     // check if this fetch request can be satisfied right away
     val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
     val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) =>
-      errorIncurred || (readResult.errorCode != ErrorMapping.NoError))
+      errorIncurred || (readResult.errorCode != Errors.NONE.code))
 
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
@@ -599,7 +600,7 @@ class ReplicaManager(val config: KafkaConfig,
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
         }
-        BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
+        BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
         controllerEpoch = leaderAndISRRequest.controllerEpoch
@@ -619,7 +620,7 @@ class ReplicaManager(val config: KafkaConfig,
                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                   topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
-              responseMap.put(topicPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
             }
           } else {
             // Otherwise record the error code in response
@@ -627,7 +628,7 @@ class ReplicaManager(val config: KafkaConfig,
               "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                 topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
-            responseMap.put(topicPartition, ErrorMapping.StaleLeaderEpochCode)
+            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
           }
         }
 
@@ -654,7 +655,7 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
 
         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.NoError)
+        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
       }
     }
   }
@@ -683,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig,
         .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
 
     for (partition <- partitionState.keys)
-      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
+      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
 
     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
 
@@ -755,7 +756,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     for (partition <- partitionState.keys)
-      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
+      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 8af7614..5c01f34 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -19,12 +19,12 @@ package kafka.tools
 
 
 import joptsimple._
-import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.consumer.SimpleConsumer
 import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
-import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition}
-import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
+import org.apache.kafka.common.errors.BrokerNotAvailableException
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.JaasUtils
 import scala.collection._
 import kafka.client.ClientUtils
@@ -126,7 +126,7 @@ object ConsumerOffsetChecker extends Logging {
 
     parser.accepts("broker-info", "Print broker info")
     parser.accepts("help", "Print this message.")
-    
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.")
 
@@ -187,10 +187,10 @@ object ConsumerOffsetChecker extends Logging {
                 throw z
           }
         }
-        else if (offsetAndMetadata.error == ErrorMapping.NoError)
+        else if (offsetAndMetadata.error == Errors.NONE.code)
           offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
         else {
-          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
         }
       }
       channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index fd9daec..fe4968d 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -27,9 +27,10 @@ import java.util.regex.{PatternSyntaxException, Pattern}
 import kafka.api._
 import java.text.SimpleDateFormat
 import java.util.Date
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.utils._
 import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer}
+import org.apache.kafka.common.protocol.Errors
 
 /**
  *  For verifying the consistency among replicas.
@@ -230,7 +231,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
 
   private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
     offsetResponse.partitionErrorAndOffsets.filter {
-      case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError
+      case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
     }.mkString
   }
 
@@ -397,4 +398,4 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
     verificationBarrier.await()
     debug("Done verification")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fccfdb6..a54cbef 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.cluster.EndPoint
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.coordinator.GroupCoordinator
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._