You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/07 06:25:26 UTC

[3/3] kafka git commit: KAFKA-2929: Migrate duplicate error mapping functionality

KAFKA-2929: Migrate duplicate error mapping functionality

Deprecates ErrorMapping.scala in core in favor or Errors.java in common.
Duplicated exceptions in core are deprecated as well, to ensure the mapping is correct.

Author: Grant Henke <gr...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #616 from granthenke/error-mapping


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9ff3f2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9ff3f2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9ff3f2e

Branch: refs/heads/trunk
Commit: a9ff3f2eced76a0aa13804439101c2897916e250
Parents: 991cafe
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Jan 6 21:24:19 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jan 6 21:24:19 2016 -0800

----------------------------------------------------------------------
 .../common/errors/AuthorizationException.java   |   4 +
 .../errors/ClusterAuthorizationException.java   |  26 +++
 .../common/errors/CorruptRecordException.java   |   6 +-
 .../errors/InvalidFetchSizeException.java       |  27 ++++
 .../common/errors/InvalidOffsetException.java   |  37 +++++
 .../errors/LeaderNotAvailableException.java     |   8 +-
 .../common/errors/OffsetMetadataTooLarge.java   |   4 +-
 .../errors/OffsetOutOfRangeException.java       |  36 +++++
 .../errors/ReplicaNotAvailableException.java    |  36 +++++
 .../apache/kafka/common/protocol/Errors.java    |  24 +--
 .../src/main/scala/kafka/admin/AdminUtils.scala |  22 +--
 .../kafka/admin/ConsumerGroupCommand.scala      |   6 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   6 +-
 .../kafka/api/ControlledShutdownResponse.scala  |   5 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   6 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   7 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   7 +-
 .../kafka/api/GroupCoordinatorResponse.scala    |  10 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |  10 +-
 .../scala/kafka/api/OffsetCommitResponse.scala  |   9 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |  10 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |  10 +-
 .../main/scala/kafka/api/OffsetResponse.scala   |   7 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   6 +-
 .../main/scala/kafka/api/ProducerResponse.scala |   6 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |  39 ++---
 .../scala/kafka/api/TopicMetadataRequest.scala  |   5 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |  10 +-
 .../kafka/api/UpdateMetadataResponse.scala      |  10 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  11 +-
 .../main/scala/kafka/cluster/Partition.scala    |  11 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |   2 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   2 +-
 core/src/main/scala/kafka/common/Topic.scala    |  11 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  23 +--
 .../kafka/controller/KafkaController.scala      |   1 +
 .../coordinator/GroupMetadataManager.scala      |  44 ++---
 .../main/scala/kafka/log/FileMessageSet.scala   |  73 ++++-----
 core/src/main/scala/kafka/log/Log.scala         | 161 ++++++++++---------
 core/src/main/scala/kafka/log/LogSegment.scala  |  69 ++++----
 .../kafka/message/InvalidMessageException.scala |  11 +-
 .../kafka/producer/BrokerPartitionInfo.scala    |  17 +-
 .../producer/async/DefaultEventHandler.scala    |   8 +-
 .../kafka/security/auth/ResourceType.scala      |   9 +-
 .../kafka/server/AbstractFetcherThread.scala    |   5 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |   3 +-
 .../scala/kafka/server/DelayedProduce.scala     |  12 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  66 ++++----
 .../main/scala/kafka/server/KafkaServer.scala   |  12 +-
 .../main/scala/kafka/server/MetadataCache.scala |   8 +-
 .../scala/kafka/server/ReplicaManager.scala     |  37 ++---
 .../kafka/tools/ConsumerOffsetChecker.scala     |  12 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   7 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  21 +--
 .../scala/other/kafka/TestOffsetManager.scala   |   9 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   3 +-
 .../api/RequestResponseSerializationTest.scala  |  18 +--
 .../scala/unit/kafka/common/TopicTest.scala     |   2 +-
 .../integration/BaseTopicMetadataTest.scala     |  27 ++--
 .../scala/unit/kafka/log/LogManagerTest.scala   |  19 +--
 .../src/test/scala/unit/kafka/log/LogTest.scala |  22 +--
 .../unit/kafka/producer/AsyncProducerTest.scala |  11 +-
 .../unit/kafka/producer/ProducerTest.scala      |   5 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  22 +--
 .../unit/kafka/server/LeaderElectionTest.scala  |  13 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   9 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  51 +++---
 68 files changed, 727 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
index 7fc932d..d8348cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -18,4 +18,8 @@ public class AuthorizationException extends ApiException {
         super(message);
     }
 
+    public AuthorizationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
new file mode 100644
index 0000000..9b8c74d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ClusterAuthorizationException extends AuthorizationException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ClusterAuthorizationException(String message) {
+        super(message);
+    }
+
+    public ClusterAuthorizationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
index c742580..56fa4b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -21,7 +21,7 @@ public class CorruptRecordException extends RetriableException {
     private static final long serialVersionUID = 1L;
 
     public CorruptRecordException() {
-        super("This message has failed its CRC checksum or is otherwise corrupt.");
+        super("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.");
     }
 
     public CorruptRecordException(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
new file mode 100644
index 0000000..230f5a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidFetchSizeException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidFetchSizeException(String message) {
+        super(message);
+    }
+
+    public InvalidFetchSizeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
new file mode 100644
index 0000000..135213d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the offset for a set of partitions is invalid (either undefined or out of range),
+ * and no reset policy has been configured.
+ * @see OffsetOutOfRangeException
+ */
+public class InvalidOffsetException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidOffsetException(String message) {
+        super(message);
+    }
+
+    public InvalidOffsetException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
index 9d7ebd4..bfdd4b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -24,4 +24,8 @@ public class LeaderNotAvailableException extends InvalidMetadataException {
         super(message);
     }
 
+    public LeaderNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
index 0be2f50..66b4fff 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..6b7a39d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * No reset policy has been defined, and the offsets for these partitions are either larger or smaller
+ * than the range of offsets the server has for the given partition.
+ */
+public class OffsetOutOfRangeException extends InvalidOffsetException {
+
+    private static final long serialVersionUID = 1L;
+
+    public OffsetOutOfRangeException(String message) {
+        super(message);
+    }
+
+    public OffsetOutOfRangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
new file mode 100644
index 0000000..d0338fa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class ReplicaNotAvailableException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ReplicaNotAvailableException(String message) {
+        super(message);
+    }
+
+    public ReplicaNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ReplicaNotAvailableException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ad0de2f..2667bc8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -20,13 +20,15 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupLoadInProgressException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -36,10 +38,13 @@ import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -49,19 +54,20 @@ import org.slf4j.LoggerFactory;
 /**
  * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
+ *
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
     UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
     NONE(0, null),
     OFFSET_OUT_OF_RANGE(1,
-            new ApiException("The requested offset is not within the range of offsets maintained by the server.")),
+            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
     CORRUPT_MESSAGE(2,
-            new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+            new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")),
     UNKNOWN_TOPIC_OR_PARTITION(3,
             new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    // TODO: errorCode 4 for InvalidFetchSize
+    INVALID_FETCH_SIZE(4,
+            new InvalidFetchSizeException("The requested fetch size is invalid.")),
     LEADER_NOT_AVAILABLE(5,
             new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
     NOT_LEADER_FOR_PARTITION(6,
@@ -71,7 +77,7 @@ public enum Errors {
     BROKER_NOT_AVAILABLE(8,
             new BrokerNotAvailableException("The broker is not available.")),
     REPLICA_NOT_AVAILABLE(9,
-            new ApiException("The replica is not available for the requested topic-partition")),
+            new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")),
     MESSAGE_TOO_LARGE(10,
             new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
     STALE_CONTROLLER_EPOCH(11,
@@ -111,11 +117,11 @@ public enum Errors {
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
     TOPIC_AUTHORIZATION_FAILED(29,
-            new AuthorizationException("Topic authorization failed.")),
+            new TopicAuthorizationException("Topic authorization failed.")),
     GROUP_AUTHORIZATION_FAILED(30,
-            new AuthorizationException("Group authorization failed.")),
+            new GroupAuthorizationException("Group authorization failed.")),
     CLUSTER_AUTHORIZATION_FAILED(31,
-            new AuthorizationException("Cluster authorization failed."));
+            new ClusterAuthorizationException("Cluster authorization failed."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 64527de..a8b3364 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -28,7 +28,8 @@ import kafka.api.{TopicMetadata, PartitionMetadata}
 
 import java.util.Random
 import java.util.Properties
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 
 import scala.Predef._
 import scala.collection._
@@ -37,7 +38,6 @@ import scala.collection.mutable
 import collection.Map
 import collection.Set
 
-import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
 object AdminUtils extends Logging {
@@ -159,7 +159,7 @@ object AdminUtils extends Logging {
     }
     ret.toMap
   }
-  
+
   def deleteTopic(zkUtils: ZkUtils, topic: String) {
     try {
       zkUtils.createPersistentPath(getDeleteTopicPath(topic))
@@ -169,7 +169,7 @@ object AdminUtils extends Logging {
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
-  
+
   def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
     zkUtils.getConsumersInGroup(group).nonEmpty
   }
@@ -224,13 +224,13 @@ object AdminUtils extends Logging {
     groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
   }
 
-  def topicExists(zkUtils: ZkUtils, topic: String): Boolean = 
+  def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
     zkUtils.zkClient.exists(getTopicPath(topic))
 
   def createTopic(zkUtils: ZkUtils,
                   topic: String,
-                  partitions: Int, 
-                  replicationFactor: Int, 
+                  partitions: Int,
+                  replicationFactor: Int,
                   topicConfig: Properties = new Properties) {
     val brokerList = zkUtils.getSortedBrokerList()
     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
@@ -345,7 +345,7 @@ object AdminUtils extends Logging {
     val map = Map("version" -> 1, "config" -> configMap)
     zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
   }
-  
+
   /**
    * Read the entity (topic or client) config (if any) from zk
    */
@@ -426,18 +426,18 @@ object AdminUtils extends Logging {
           if(isrInfo.size < inSyncReplicas.size)
             throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
               inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
         } catch {
           case e: Throwable =>
             debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
             new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              Errors.forException(e).code)
         }
       }
       new TopicMetadata(topic, partitionMetadata)
     } else {
       // topic doesn't exist, send appropriate error code
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+      new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d71499e..c192a4f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -29,6 +29,8 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.BrokerNotAvailableException
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
@@ -223,11 +225,11 @@ object ConsumerGroupCommand {
                 .format(group, topicAndPartition))
           }
         }
-        else if (offsetAndMetadata.error == ErrorMapping.NoError)
+        else if (offsetAndMetadata.error == Errors.NONE.code)
           offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
         else
           println("Could not fetch offset from kafka for group %s partition %s due to %s."
-            .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+            .format(group, topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
       }
       channel.disconnect()
       offsetMap.toMap

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index f827d54..b875e3e 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -19,12 +19,12 @@ package kafka.api
 
 import java.nio.ByteBuffer
 
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicAndPartition}
 import kafka.api.ApiUtils._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 object ControlledShutdownRequest extends Logging {
   val CurrentVersion = 1.shortValue
@@ -68,7 +68,7 @@ case class ControlledShutdownRequest(versionId: Short,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
+    val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e).code, Set.empty[TopicAndPartition])
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 9ecdee7..02eeae1 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -18,8 +18,9 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
+import org.apache.kafka.common.protocol.Errors
 import collection.Set
 
 object ControlledShutdownResponse {
@@ -40,7 +41,7 @@ object ControlledShutdownResponse {
 
 
 case class ControlledShutdownResponse(correlationId: Int,
-                                      errorCode: Short = ErrorMapping.NoError,
+                                      errorCode: Short = Errors.NONE.code,
                                       partitionsRemaining: Set[TopicAndPartition])
   extends RequestOrResponse() {
   def sizeInBytes(): Int ={

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ca47e75..b43b8f4 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -19,14 +19,14 @@ package kafka.api
 
 import kafka.utils.nonthreadsafe
 import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
 import kafka.network.RequestChannel
 import kafka.message.MessageSet
 
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 import scala.collection.immutable.Map
 
@@ -148,7 +148,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val fetchResponsePartitionData = requestInfo.map {
       case (topicAndPartition, data) =>
-        (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
+        (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
     }
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 43ae38e..1066d7f 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -20,11 +20,12 @@ package kafka.api
 import java.nio.ByteBuffer
 import java.nio.channels.GatheringByteChannel
 
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.network.{Send, MultiSend}
+import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
 
@@ -45,7 +46,7 @@ object FetchResponsePartitionData {
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, messages: MessageSet) {
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 }
 
@@ -246,7 +247,7 @@ case class FetchResponse(correlationId: Int,
 
   def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
 
-  def hasError = data.values.exists(_.error != ErrorMapping.NoError)
+  def hasError = data.values.exists(_.error != Errors.NONE.code)
 
   def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 7e7b55c..5f88136 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -19,10 +19,9 @@ package kafka.api
 
 import java.nio.ByteBuffer
 
-import kafka.common.ErrorMapping
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 object GroupCoordinatorRequest {
   val CurrentVersion = 0.shortValue
@@ -65,7 +64,7 @@ case class GroupCoordinatorRequest(group: String,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+    val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
@@ -78,4 +77,4 @@ case class GroupCoordinatorRequest(group: String,
     consumerMetadataRequest.append("; Group: " + group)
     consumerMetadataRequest.toString()
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
index 4cd7db8..83ba96e 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -19,25 +19,25 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
+import org.apache.kafka.common.protocol.Errors
 
 object GroupCoordinatorResponse {
   val CurrentVersion = 0
 
   private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-  
+
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
     val broker = BrokerEndPoint.readFrom(buffer)
-    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+    val coordinatorOpt = if (errorCode == Errors.NONE.code)
       Some(broker)
     else
       None
 
     GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
   }
-  
+
 }
 
 case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
@@ -55,4 +55,4 @@ case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], err
   }
 
   def describe(details: Boolean) = toString
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 534eedf..e7cd952 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,11 +20,11 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 import scala.collection._
 
@@ -41,7 +41,7 @@ object OffsetCommitRequest extends Logging {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
 
-    // Read the OffsetRequest
+    // Read the OffsetRequest 
     val groupId = readShortString(buffer)
 
     // version 1 and 2 specific fields
@@ -161,7 +161,7 @@ case class OffsetCommitRequest(groupId: String,
     })
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    val errorCode = Errors.forException(e).code
     val commitStatus = requestInfo.mapValues(_ => errorCode)
     val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 116547a..d4f6158 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,7 +20,8 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.utils.Logging
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.protocol.Errors
 
 object OffsetCommitResponse extends Logging {
   val CurrentVersion: Short = 0
@@ -47,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
-  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
+  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
@@ -62,7 +63,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
     }
   }
 
-  override def sizeInBytes = 
+  override def sizeInBytes =
     4 + /* correlationId */
     4 + /* topic count */
     commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index f0a3c9c..d78fbf3 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,7 +24,7 @@ import kafka.common.{TopicAndPartition, _}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 1
@@ -59,7 +59,7 @@ case class OffsetFetchRequest(groupId: String,
     extends RequestOrResponse(Some(ApiKeys.OFFSET_FETCH.id)) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
-
+  
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -82,7 +82,7 @@ case class OffsetFetchRequest(groupId: String,
     2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
-    shortStringLength(groupId) +
+    shortStringLength(groupId) + 
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
       count + shortStringLength(t._1) + /* topic */
@@ -93,7 +93,7 @@ case class OffsetFetchRequest(groupId: String,
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val responseMap = requestInfo.map {
       case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
-        ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        Errors.forException(e).code
       ))
     }.toMap
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index a2ef7eb..59181d1 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,10 +20,10 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 
 object OffsetRequest {
@@ -116,7 +116,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val partitionOffsetResponseMap = requestInfo.map {
       case (topicAndPartition, partitionOffsetRequest) =>
-        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil))
+        (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
@@ -133,4 +133,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 63c0899..766ff88 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -18,8 +18,9 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
+import org.apache.kafka.common.protocol.Errors
 
 
 object OffsetResponse {
@@ -46,7 +47,7 @@ object OffsetResponse {
 
 case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
   override def toString(): String = {
-    new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString)
+    new String("error: " + Errors.forCode(error).exception.getClass.getName + " offsets: " + offsets.mkString)
   }
 }
 
@@ -57,7 +58,7 @@ case class OffsetResponse(correlationId: Int,
 
   lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
 
-  def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
+  def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE.code)
 
   val sizeInBytes = {
     4 + /* correlation id */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index a697dc6..11f5009 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,7 +24,7 @@ import kafka.common._
 import kafka.message._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 object ProducerRequest {
   val CurrentVersion = 1.shortValue
@@ -135,7 +135,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     else {
       val producerResponseStatus = data.map {
         case (topicAndPartition, data) =>
-          (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+          (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 7719f30..7e745cf 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -18,8 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+import org.apache.kafka.common.protocol.Errors
+
 import scala.collection.Map
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
 
 object ProducerResponse {
@@ -56,7 +58,7 @@ case class ProducerResponse(correlationId: Int,
    */
   private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
 
-  def hasError = status.values.exists(_.error != ErrorMapping.NoError)
+  def hasError = status.values.exists(_.error != Errors.NONE.code)
 
   val sizeInBytes = {
     val throttleTimeSize = if (requestVersion > 0) 4 else 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 7b56b31..97bbeea 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -22,9 +22,10 @@ import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
 import kafka.common._
+import org.apache.kafka.common.protocol.Errors
 
 object TopicMetadata {
-  
+
   val NoLeaderNodeId = -1
 
   def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = {
@@ -40,10 +41,10 @@ object TopicMetadata {
   }
 }
 
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = Errors.NONE.code) extends Logging {
   def sizeInBytes: Int = {
-    2 /* error code */ + 
-    shortStringLength(topic) + 
+    2 /* error code */ +
+    shortStringLength(topic) +
     4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */
   }
 
@@ -60,26 +61,26 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
   override def toString(): String = {
     val topicMetadataInfo = new StringBuilder
     topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
-    errorCode match {
-      case ErrorMapping.NoError =>
+    Errors.forCode(errorCode) match {
+      case Errors.NONE =>
         partitionsMetadata.foreach { partitionMetadata =>
-          partitionMetadata.errorCode match {
-            case ErrorMapping.NoError =>
+          Errors.forCode(partitionMetadata.errorCode) match {
+            case Errors.NONE =>
               topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic,
                 partitionMetadata.partitionId, partitionMetadata.toString()))
-            case ErrorMapping.ReplicaNotAvailableCode =>
+            case Errors.REPLICA_NOT_AVAILABLE =>
               // this error message means some replica other than the leader is not available. The consumer
               // doesn't care about non leader replicas, so ignore this
               topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic,
                 partitionMetadata.partitionId, partitionMetadata.toString()))
-            case _ =>
+            case error: Errors =>
               topicMetadataInfo.append("\nMetadata for partition [%s,%d] is not available due to %s".format(topic,
-                partitionMetadata.partitionId, ErrorMapping.exceptionFor(partitionMetadata.errorCode).getClass.getName))
+                partitionMetadata.partitionId, error.exception.getClass.getName))
           }
         }
-      case _ =>
+      case error: Errors =>
         topicMetadataInfo.append("\nNo partition metadata for topic %s due to %s".format(topic,
-                                 ErrorMapping.exceptionFor(errorCode).getClass.getName))
+          error.exception.getClass.getName))
     }
     topicMetadataInfo.append("}")
     topicMetadataInfo.toString()
@@ -108,16 +109,16 @@ object PartitionMetadata {
   }
 }
 
-case class PartitionMetadata(partitionId: Int, 
+case class PartitionMetadata(partitionId: Int,
                              leader: Option[BrokerEndPoint],
                              replicas: Seq[BrokerEndPoint],
                              isr: Seq[BrokerEndPoint] = Seq.empty,
-                             errorCode: Short = ErrorMapping.NoError) extends Logging {
+                             errorCode: Short = Errors.NONE.code) extends Logging {
   def sizeInBytes: Int = {
-    2 /* error code */ + 
-    4 /* partition id */ + 
-    4 /* leader */ + 
-    4 + 4 * replicas.size /* replica array */ + 
+    2 /* error code */ +
+    4 /* partition id */ +
+    4 /* leader */ +
+    4 + 4 * replicas.size /* replica array */ +
     4 + 4 * isr.size /* isr array */
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 656ff9f..be13586 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,11 +20,10 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.common.ErrorMapping
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 import scala.collection.mutable.ListBuffer
 
@@ -80,7 +79,7 @@ case class TopicMetadataRequest(versionId: Short,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val topicMetadata = topics.map {
-      topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+      topic => TopicMetadata(topic, Nil, Errors.forException(e).code)
     }
     val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 059c03e..f761125 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,10 +20,10 @@ import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
 import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
+import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 
 import scala.collection.Set
 
@@ -127,7 +127,7 @@ case class UpdateMetadataRequest (versionId: Short,
   }
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val errorResponse = new UpdateMetadataResponse(correlationId, Errors.forException(e).code)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
@@ -144,4 +144,4 @@ case class UpdateMetadataRequest (versionId: Short,
       updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     updateMetadataRequest.toString()
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
index 53f6067..3bdb3ca 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -17,12 +17,8 @@
 
 package kafka.api
 
-import kafka.common.{TopicAndPartition, ErrorMapping}
 import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import collection.mutable.HashMap
-import collection.Map
-
+import org.apache.kafka.common.protocol.Errors
 
 object UpdateMetadataResponse {
   def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
@@ -33,7 +29,7 @@ object UpdateMetadataResponse {
 }
 
 case class UpdateMetadataResponse(correlationId: Int,
-                                  errorCode: Short = ErrorMapping.NoError)
+                                  errorCode: Short = Errors.NONE.code)
   extends RequestOrResponse() {
   def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
 
@@ -43,4 +39,4 @@ case class UpdateMetadataResponse(correlationId: Int,
   }
 
   override def describe(details: Boolean):String = { toString }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 2f836c0..2093749 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -16,19 +16,18 @@
  */
 package kafka.client
 
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 
 import scala.collection._
 import kafka.cluster._
 import kafka.api._
 import kafka.producer._
-import kafka.common.{ErrorMapping, KafkaException}
+import kafka.common.KafkaException
 import kafka.utils.{CoreUtils, Logging}
 import java.util.Properties
 import util.Random
 import kafka.network.BlockingChannel
 import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
 import java.io.IOException
 
  /**
@@ -95,7 +94,7 @@ object ClientUtils extends Logging{
   }
 
   /**
-   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
+   * Parse a list of broker urls in the form host1:port1, host2:port2, ...
    */
   def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = {
     val brokersStr = CoreUtils.parseCsvList(brokerListStr)
@@ -155,7 +154,7 @@ object ClientUtils extends Logging{
            val response = queryChannel.receive()
            val consumerMetadataResponse =  GroupCoordinatorResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
-           if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
+           if (consumerMetadataResponse.errorCode == Errors.NONE.code)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt
            else {
              debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds."

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 916f3e7..1bfb144 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -29,8 +29,11 @@ import kafka.message.ByteBufferMessageSet
 
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
 
+
 import scala.collection.JavaConverters._
 
 import com.yammer.metrics.core.Gauge
@@ -325,14 +328,14 @@ class Partition(val topic: String,
           * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
           */
           if (minIsr <= curInSyncReplicas.size) {
-            (true, ErrorMapping.NoError)
+            (true, Errors.NONE.code)
           } else {
-            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
+            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
           }
         } else
-          (false, ErrorMapping.NoError)
+          (false, Errors.NONE.code)
       case None =>
-        (false, ErrorMapping.NotLeaderForPartitionCode)
+        (false, Errors.NOT_LEADER_FOR_PARTITION.code)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index e0ebe94..e20b88c 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -69,8 +69,8 @@ object ErrorMapping {
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
       classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
-      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
+      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index a94e58c..4d9ae40 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -70,7 +70,7 @@ object OffsetMetadataAndError {
   val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
   val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
 
-  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError)
+  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE.code)
 
   def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 982955e..55d2bdb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -20,7 +20,6 @@ package kafka.common
 import util.matching.Regex
 import kafka.coordinator.GroupCoordinator
 
-
 object Topic {
   val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 255
@@ -30,17 +29,17 @@ object Topic {
 
   def validate(topic: String) {
     if (topic.length <= 0)
-      throw new InvalidTopicException("topic name is illegal, can't be empty")
+      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty")
     else if (topic.equals(".") || topic.equals(".."))
-      throw new InvalidTopicException("topic name cannot be \".\" or \"..\"")
+      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\"")
     else if (topic.length > maxNameLength)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
 
     rgx.findFirstIn(topic) match {
       case Some(t) =>
         if (!t.equals(topic))
-          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
-      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+          throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+      case None => throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and '-'")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 6fa410a..f776578 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -36,7 +36,8 @@ import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
@@ -274,7 +275,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
                                                   "timestamp" -> timestamp))
     val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
-                                                    consumerRegistryDir + "/" + consumerIdString, 
+                                                    consumerRegistryDir + "/" + consumerIdString,
                                                     consumerRegistrationInfo,
                                                     zkUtils.zkConnection.getZookeeper,
                                                     false)
@@ -356,23 +357,23 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
                 offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) =>
 
-                  if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) {
+                  if (errorCode == Errors.NONE.code && config.dualCommitEnabled) {
                     val offset = offsetsToCommit(topicPartition).offset
                     commitOffsetToZooKeeper(topicPartition, offset)
                   }
 
                   (folded._1 || // update commitFailed
-                    errorCode != ErrorMapping.NoError,
+                    errorCode != Errors.NONE.code,
 
                     folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
-                      (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
+                      (errorCode != Errors.NONE.code && errorCode != Errors.OFFSET_METADATA_TOO_LARGE.code),
 
                     folded._3 || // update shouldRefreshCoordinator
-                      errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
-                      errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
+                      errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code ||
+                      errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code,
 
                     // update error count
-                    folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
+                    folded._4 + (if (errorCode != Errors.NONE.code) 1 else 0))
                 }
               }
               debug(errorCount + " errors in offset commit response.")
@@ -442,8 +443,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             val (leaderChanged, loadInProgress) =
               offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
-                (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode),
-                 folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode))
+                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
+                 folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
               }
 
             if (leaderChanged) {
@@ -463,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                 val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
                   val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
                   val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
-                  (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError))
+                  (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE.code))
                 }
                 Some(OffsetFetchResponse(mostRecentOffsets))
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e7eef36..103f6cf 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -18,6 +18,7 @@ package kafka.controller
 
 import java.util
 
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 00f3275..79e318c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -190,23 +190,23 @@ class GroupMetadataManager(val brokerId: Int,
       val status = responseStatus(groupMetadataPartition)
 
       var responseCode = Errors.NONE.code
-      if (status.error != ErrorMapping.NoError) {
+      if (status.error != Errors.NONE.code) {
         debug("Metadata from group %s with generation %d failed when appending to log due to %s"
-          .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+          .format(group.groupId, generationId, Errors.forCode(status.error).exception.getClass.getName))
 
         // transform the log append error code to the corresponding the commit status error code
-        responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) {
+        responseCode = if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
           Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-        } else if (status.error == ErrorMapping.NotLeaderForPartitionCode) {
+        } else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) {
           Errors.NOT_COORDINATOR_FOR_GROUP.code
-        } else if (status.error == ErrorMapping.RequestTimedOutCode) {
+        } else if (status.error == Errors.REQUEST_TIMED_OUT.code) {
           Errors.REBALANCE_IN_PROGRESS.code
-        } else if (status.error == ErrorMapping.MessageSizeTooLargeCode
-          || status.error == ErrorMapping.MessageSetSizeTooLargeCode
-          || status.error == ErrorMapping.InvalidFetchSizeCode) {
+        } else if (status.error == Errors.MESSAGE_TOO_LARGE.code
+          || status.error == Errors.RECORD_LIST_TOO_LARGE.code
+          || status.error == Errors.INVALID_FETCH_SIZE.code) {
 
           error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
-            .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+            .format(group.groupId, generationId, Errors.forCode(status.error).exception.getClass.getName))
 
           Errors.UNKNOWN.code
         } else {
@@ -271,23 +271,23 @@ class GroupMetadataManager(val brokerId: Int,
       val status = responseStatus(offsetTopicPartition)
 
       val responseCode =
-        if (status.error == ErrorMapping.NoError) {
+        if (status.error == Errors.NONE.code) {
           filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
             putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
           }
-          ErrorMapping.NoError
+          Errors.NONE.code
         } else {
           debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
-            .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+            .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.error).exception.getClass.getName))
 
           // transform the log append error code to the corresponding the commit status error code
-          if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
-            ErrorMapping.ConsumerCoordinatorNotAvailableCode
-          else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
-            ErrorMapping.NotCoordinatorForConsumerCode
-          else if (status.error == ErrorMapping.MessageSizeTooLargeCode
-            || status.error == ErrorMapping.MessageSetSizeTooLargeCode
-            || status.error == ErrorMapping.InvalidFetchSizeCode)
+          if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+            Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
+          else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code)
+            Errors.NOT_COORDINATOR_FOR_GROUP.code
+          else if (status.error == Errors.MESSAGE_TOO_LARGE.code
+            || status.error == Errors.RECORD_LIST_TOO_LARGE.code
+            || status.error == Errors.INVALID_FETCH_SIZE.code)
             Errors.INVALID_COMMIT_OFFSET_SIZE.code
           else
             status.error
@@ -299,7 +299,7 @@ class GroupMetadataManager(val brokerId: Int,
         if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
           (topicAndPartition, responseCode)
         else
-          (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+          (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
       }
 
       // finally trigger the callback logic passed from the API layer
@@ -320,7 +320,7 @@ class GroupMetadataManager(val brokerId: Int,
       if (topicPartitions.isEmpty) {
         // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
         offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
-          (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
+          (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
         }.toMap
       } else {
         topicPartitions.map { topicAndPartition =>
@@ -516,7 +516,7 @@ class GroupMetadataManager(val brokerId: Int,
     if (offsetAndMetadata == null)
       OffsetMetadataAndError.NoOffset
     else
-      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
+      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
   }
 
   /**