You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/07 06:25:26 UTC
[3/3] kafka git commit: KAFKA-2929: Migrate duplicate error mapping
functionality
KAFKA-2929: Migrate duplicate error mapping functionality
Deprecates ErrorMapping.scala in core in favor or Errors.java in common.
Duplicated exceptions in core are deprecated as well, to ensure the mapping is correct.
Author: Grant Henke <gr...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #616 from granthenke/error-mapping
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9ff3f2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9ff3f2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9ff3f2e
Branch: refs/heads/trunk
Commit: a9ff3f2eced76a0aa13804439101c2897916e250
Parents: 991cafe
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Jan 6 21:24:19 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jan 6 21:24:19 2016 -0800
----------------------------------------------------------------------
.../common/errors/AuthorizationException.java | 4 +
.../errors/ClusterAuthorizationException.java | 26 +++
.../common/errors/CorruptRecordException.java | 6 +-
.../errors/InvalidFetchSizeException.java | 27 ++++
.../common/errors/InvalidOffsetException.java | 37 +++++
.../errors/LeaderNotAvailableException.java | 8 +-
.../common/errors/OffsetMetadataTooLarge.java | 4 +-
.../errors/OffsetOutOfRangeException.java | 36 +++++
.../errors/ReplicaNotAvailableException.java | 36 +++++
.../apache/kafka/common/protocol/Errors.java | 24 +--
.../src/main/scala/kafka/admin/AdminUtils.scala | 22 +--
.../kafka/admin/ConsumerGroupCommand.scala | 6 +-
.../kafka/api/ControlledShutdownRequest.scala | 6 +-
.../kafka/api/ControlledShutdownResponse.scala | 5 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 6 +-
.../main/scala/kafka/api/FetchResponse.scala | 7 +-
.../kafka/api/GroupCoordinatorRequest.scala | 7 +-
.../kafka/api/GroupCoordinatorResponse.scala | 10 +-
.../scala/kafka/api/OffsetCommitRequest.scala | 10 +-
.../scala/kafka/api/OffsetCommitResponse.scala | 9 +-
.../scala/kafka/api/OffsetFetchRequest.scala | 10 +-
.../main/scala/kafka/api/OffsetRequest.scala | 10 +-
.../main/scala/kafka/api/OffsetResponse.scala | 7 +-
.../main/scala/kafka/api/ProducerRequest.scala | 6 +-
.../main/scala/kafka/api/ProducerResponse.scala | 6 +-
.../main/scala/kafka/api/TopicMetadata.scala | 39 ++---
.../scala/kafka/api/TopicMetadataRequest.scala | 5 +-
.../scala/kafka/api/UpdateMetadataRequest.scala | 10 +-
.../kafka/api/UpdateMetadataResponse.scala | 10 +-
.../main/scala/kafka/client/ClientUtils.scala | 11 +-
.../main/scala/kafka/cluster/Partition.scala | 11 +-
.../main/scala/kafka/common/ErrorMapping.scala | 2 +-
.../kafka/common/OffsetMetadataAndError.scala | 2 +-
core/src/main/scala/kafka/common/Topic.scala | 11 +-
.../consumer/ZookeeperConsumerConnector.scala | 23 +--
.../kafka/controller/KafkaController.scala | 1 +
.../coordinator/GroupMetadataManager.scala | 44 ++---
.../main/scala/kafka/log/FileMessageSet.scala | 73 ++++-----
core/src/main/scala/kafka/log/Log.scala | 161 ++++++++++---------
core/src/main/scala/kafka/log/LogSegment.scala | 69 ++++----
.../kafka/message/InvalidMessageException.scala | 11 +-
.../kafka/producer/BrokerPartitionInfo.scala | 17 +-
.../producer/async/DefaultEventHandler.scala | 8 +-
.../kafka/security/auth/ResourceType.scala | 9 +-
.../kafka/server/AbstractFetcherThread.scala | 5 +-
.../main/scala/kafka/server/DelayedFetch.scala | 3 +-
.../scala/kafka/server/DelayedProduce.scala | 12 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 66 ++++----
.../main/scala/kafka/server/KafkaServer.scala | 12 +-
.../main/scala/kafka/server/MetadataCache.scala | 8 +-
.../scala/kafka/server/ReplicaManager.scala | 37 ++---
.../kafka/tools/ConsumerOffsetChecker.scala | 12 +-
.../kafka/tools/ReplicaVerificationTool.scala | 7 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../test/scala/other/kafka/StressTestLog.scala | 21 +--
.../scala/other/kafka/TestOffsetManager.scala | 9 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 3 +-
.../api/RequestResponseSerializationTest.scala | 18 +--
.../scala/unit/kafka/common/TopicTest.scala | 2 +-
.../integration/BaseTopicMetadataTest.scala | 27 ++--
.../scala/unit/kafka/log/LogManagerTest.scala | 19 +--
.../src/test/scala/unit/kafka/log/LogTest.scala | 22 +--
.../unit/kafka/producer/AsyncProducerTest.scala | 11 +-
.../unit/kafka/producer/ProducerTest.scala | 5 +-
.../unit/kafka/producer/SyncProducerTest.scala | 22 +--
.../unit/kafka/server/LeaderElectionTest.scala | 13 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 9 +-
.../unit/kafka/server/OffsetCommitTest.scala | 51 +++---
68 files changed, 727 insertions(+), 521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
index 7fc932d..d8348cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -18,4 +18,8 @@ public class AuthorizationException extends ApiException {
super(message);
}
+ public AuthorizationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
new file mode 100644
index 0000000..9b8c74d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ClusterAuthorizationException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ClusterAuthorizationException extends AuthorizationException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ClusterAuthorizationException(String message) {
+ super(message);
+ }
+
+ public ClusterAuthorizationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
index c742580..56fa4b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -21,7 +21,7 @@ public class CorruptRecordException extends RetriableException {
private static final long serialVersionUID = 1L;
public CorruptRecordException() {
- super("This message has failed its CRC checksum or is otherwise corrupt.");
+ super("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.");
}
public CorruptRecordException(String message) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
new file mode 100644
index 0000000..230f5a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidFetchSizeException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidFetchSizeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidFetchSizeException(String message) {
+ super(message);
+ }
+
+ public InvalidFetchSizeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
new file mode 100644
index 0000000..135213d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidOffsetException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the offset for a set of partitions is invalid (either undefined or out of range),
+ * and no reset policy has been configured.
+ * @see OffsetOutOfRangeException
+ */
+public class InvalidOffsetException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidOffsetException(String message) {
+ super(message);
+ }
+
+ public InvalidOffsetException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
index 9d7ebd4..bfdd4b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -24,4 +24,8 @@ public class LeaderNotAvailableException extends InvalidMetadataException {
super(message);
}
+ public LeaderNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
index 0be2f50..66b4fff 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..6b7a39d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * No reset policy has been defined, and the offsets for these partitions are either larger or smaller
+ * than the range of offsets the server has for the given partition.
+ */
+public class OffsetOutOfRangeException extends InvalidOffsetException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetOutOfRangeException(String message) {
+ super(message);
+ }
+
+ public OffsetOutOfRangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
new file mode 100644
index 0000000..d0338fa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class ReplicaNotAvailableException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ReplicaNotAvailableException(String message) {
+ super(message);
+ }
+
+ public ReplicaNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ReplicaNotAvailableException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ad0de2f..2667bc8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -20,13 +20,15 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupLoadInProgressException;
import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -36,10 +38,13 @@ import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -49,19 +54,20 @@ import org.slf4j.LoggerFactory;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
- *
+ *
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
OFFSET_OUT_OF_RANGE(1,
- new ApiException("The requested offset is not within the range of offsets maintained by the server.")),
+ new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
CORRUPT_MESSAGE(2,
- new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3,
new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- // TODO: errorCode 4 for InvalidFetchSize
+ INVALID_FETCH_SIZE(4,
+ new InvalidFetchSizeException("The requested fetch size is invalid.")),
LEADER_NOT_AVAILABLE(5,
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6,
@@ -71,7 +77,7 @@ public enum Errors {
BROKER_NOT_AVAILABLE(8,
new BrokerNotAvailableException("The broker is not available.")),
REPLICA_NOT_AVAILABLE(9,
- new ApiException("The replica is not available for the requested topic-partition")),
+ new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")),
MESSAGE_TOO_LARGE(10,
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
STALE_CONTROLLER_EPOCH(11,
@@ -111,11 +117,11 @@ public enum Errors {
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid")),
TOPIC_AUTHORIZATION_FAILED(29,
- new AuthorizationException("Topic authorization failed.")),
+ new TopicAuthorizationException("Topic authorization failed.")),
GROUP_AUTHORIZATION_FAILED(30,
- new AuthorizationException("Group authorization failed.")),
+ new GroupAuthorizationException("Group authorization failed.")),
CLUSTER_AUTHORIZATION_FAILED(31,
- new AuthorizationException("Cluster authorization failed."));
+ new ClusterAuthorizationException("Cluster authorization failed."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 64527de..a8b3364 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -28,7 +28,8 @@ import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
import java.util.Properties
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import scala.Predef._
import scala.collection._
@@ -37,7 +38,6 @@ import scala.collection.mutable
import collection.Map
import collection.Set
-import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
@@ -159,7 +159,7 @@ object AdminUtils extends Logging {
}
ret.toMap
}
-
+
def deleteTopic(zkUtils: ZkUtils, topic: String) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
@@ -169,7 +169,7 @@ object AdminUtils extends Logging {
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
-
+
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
zkUtils.getConsumersInGroup(group).nonEmpty
}
@@ -224,13 +224,13 @@ object AdminUtils extends Logging {
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
}
- def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
+ def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
def createTopic(zkUtils: ZkUtils,
topic: String,
- partitions: Int,
- replicationFactor: Int,
+ partitions: Int,
+ replicationFactor: Int,
topicConfig: Properties = new Properties) {
val brokerList = zkUtils.getSortedBrokerList()
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
@@ -345,7 +345,7 @@ object AdminUtils extends Logging {
val map = Map("version" -> 1, "config" -> configMap)
zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
}
-
+
/**
* Read the entity (topic or client) config (if any) from zk
*/
@@ -426,18 +426,18 @@ object AdminUtils extends Logging {
if(isrInfo.size < inSyncReplicas.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
- new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+ new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
} catch {
case e: Throwable =>
debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ Errors.forException(e).code)
}
}
new TopicMetadata(topic, partitionMetadata)
} else {
// topic doesn't exist, send appropriate error code
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d71499e..c192a4f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -29,6 +29,8 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.BrokerNotAvailableException
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
@@ -223,11 +225,11 @@ object ConsumerGroupCommand {
.format(group, topicAndPartition))
}
}
- else if (offsetAndMetadata.error == ErrorMapping.NoError)
+ else if (offsetAndMetadata.error == Errors.NONE.code)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
println("Could not fetch offset from kafka for group %s partition %s due to %s."
- .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+ .format(group, topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
}
channel.disconnect()
offsetMap.toMap
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index f827d54..b875e3e 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -19,12 +19,12 @@ package kafka.api
import java.nio.ByteBuffer
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicAndPartition}
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object ControlledShutdownRequest extends Logging {
val CurrentVersion = 1.shortValue
@@ -68,7 +68,7 @@ case class ControlledShutdownRequest(versionId: Short,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
+ val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e).code, Set.empty[TopicAndPartition])
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 9ecdee7..02eeae1 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -18,8 +18,9 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
+import org.apache.kafka.common.protocol.Errors
import collection.Set
object ControlledShutdownResponse {
@@ -40,7 +41,7 @@ object ControlledShutdownResponse {
case class ControlledShutdownResponse(correlationId: Int,
- errorCode: Short = ErrorMapping.NoError,
+ errorCode: Short = Errors.NONE.code,
partitionsRemaining: Set[TopicAndPartition])
extends RequestOrResponse() {
def sizeInBytes(): Int ={
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ca47e75..b43b8f4 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -19,14 +19,14 @@ package kafka.api
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import kafka.network.RequestChannel
import kafka.message.MessageSet
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import scala.collection.immutable.Map
@@ -148,7 +148,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val fetchResponsePartitionData = requestInfo.map {
case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
+ (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
}
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 43ae38e..1066d7f 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -20,11 +20,12 @@ package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.api.ApiUtils._
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.network.{Send, MultiSend}
+import org.apache.kafka.common.protocol.Errors
import scala.collection._
@@ -45,7 +46,7 @@ object FetchResponsePartitionData {
4 /* messageSetSize */
}
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, messages: MessageSet) {
val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
}
@@ -246,7 +247,7 @@ case class FetchResponse(correlationId: Int,
def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
- def hasError = data.values.exists(_.error != ErrorMapping.NoError)
+ def hasError = data.values.exists(_.error != Errors.NONE.code)
def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 7e7b55c..5f88136 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -19,10 +19,9 @@ package kafka.api
import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object GroupCoordinatorRequest {
val CurrentVersion = 0.shortValue
@@ -65,7 +64,7 @@ case class GroupCoordinatorRequest(group: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
- val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+ val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
@@ -78,4 +77,4 @@ case class GroupCoordinatorRequest(group: String,
consumerMetadataRequest.append("; Group: " + group)
consumerMetadataRequest.toString()
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
index 4cd7db8..83ba96e 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -19,25 +19,25 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
+import org.apache.kafka.common.protocol.Errors
object GroupCoordinatorResponse {
val CurrentVersion = 0
private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-
+
def readFrom(buffer: ByteBuffer) = {
val correlationId = buffer.getInt
val errorCode = buffer.getShort
val broker = BrokerEndPoint.readFrom(buffer)
- val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ val coordinatorOpt = if (errorCode == Errors.NONE.code)
Some(broker)
else
None
GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
}
-
+
}
case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
@@ -55,4 +55,4 @@ case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], err
}
def describe(details: Boolean) = toString
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 534eedf..e7cd952 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,11 +20,11 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import scala.collection._
@@ -41,7 +41,7 @@ object OffsetCommitRequest extends Logging {
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
- // Read the OffsetRequest
+ // Read the OffsetRequest
val groupId = readShortString(buffer)
// version 1 and 2 specific fields
@@ -161,7 +161,7 @@ case class OffsetCommitRequest(groupId: String,
})
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ val errorCode = Errors.forException(e).code
val commitStatus = requestInfo.mapValues(_ => errorCode)
val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 116547a..d4f6158 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,7 +20,8 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Logging
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.protocol.Errors
object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 0
@@ -47,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
- def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
+ def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
@@ -62,7 +63,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
}
}
- override def sizeInBytes =
+ override def sizeInBytes =
4 + /* correlationId */
4 + /* topic count */
commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index f0a3c9c..d78fbf3 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -24,7 +24,7 @@ import kafka.common.{TopicAndPartition, _}
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object OffsetFetchRequest extends Logging {
val CurrentVersion: Short = 1
@@ -59,7 +59,7 @@ case class OffsetFetchRequest(groupId: String,
extends RequestOrResponse(Some(ApiKeys.OFFSET_FETCH.id)) {
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
-
+
def writeTo(buffer: ByteBuffer) {
// Write envelope
buffer.putShort(versionId)
@@ -82,7 +82,7 @@ case class OffsetFetchRequest(groupId: String,
2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
- shortStringLength(groupId) +
+ shortStringLength(groupId) +
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
count + shortStringLength(t._1) + /* topic */
@@ -93,7 +93,7 @@ case class OffsetFetchRequest(groupId: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = requestInfo.map {
case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ Errors.forException(e).code
))
}.toMap
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index a2ef7eb..59181d1 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,10 +20,10 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object OffsetRequest {
@@ -116,7 +116,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val partitionOffsetResponseMap = requestInfo.map {
case (topicAndPartition, partitionOffsetRequest) =>
- (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil))
+ (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
@@ -133,4 +133,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
offsetRequest.toString()
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 63c0899..766ff88 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -18,8 +18,9 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
+import org.apache.kafka.common.protocol.Errors
object OffsetResponse {
@@ -46,7 +47,7 @@ object OffsetResponse {
case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
override def toString(): String = {
- new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString)
+ new String("error: " + Errors.forCode(error).exception.getClass.getName + " offsets: " + offsets.mkString)
}
}
@@ -57,7 +58,7 @@ case class OffsetResponse(correlationId: Int,
lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
- def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
+ def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE.code)
val sizeInBytes = {
4 + /* correlation id */
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index a697dc6..11f5009 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -24,7 +24,7 @@ import kafka.common._
import kafka.message._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object ProducerRequest {
val CurrentVersion = 1.shortValue
@@ -135,7 +135,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
else {
val producerResponseStatus = data.map {
case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+ (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 7719f30..7e745cf 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -18,8 +18,10 @@
package kafka.api
import java.nio.ByteBuffer
+import org.apache.kafka.common.protocol.Errors
+
import scala.collection.Map
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
object ProducerResponse {
@@ -56,7 +58,7 @@ case class ProducerResponse(correlationId: Int,
*/
private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
- def hasError = status.values.exists(_.error != ErrorMapping.NoError)
+ def hasError = status.values.exists(_.error != Errors.NONE.code)
val sizeInBytes = {
val throttleTimeSize = if (requestVersion > 0) 4 else 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 7b56b31..97bbeea 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -22,9 +22,10 @@ import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
import kafka.common._
+import org.apache.kafka.common.protocol.Errors
object TopicMetadata {
-
+
val NoLeaderNodeId = -1
def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = {
@@ -40,10 +41,10 @@ object TopicMetadata {
}
}
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = Errors.NONE.code) extends Logging {
def sizeInBytes: Int = {
- 2 /* error code */ +
- shortStringLength(topic) +
+ 2 /* error code */ +
+ shortStringLength(topic) +
4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */
}
@@ -60,26 +61,26 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
override def toString(): String = {
val topicMetadataInfo = new StringBuilder
topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
- errorCode match {
- case ErrorMapping.NoError =>
+ Errors.forCode(errorCode) match {
+ case Errors.NONE =>
partitionsMetadata.foreach { partitionMetadata =>
- partitionMetadata.errorCode match {
- case ErrorMapping.NoError =>
+ Errors.forCode(partitionMetadata.errorCode) match {
+ case Errors.NONE =>
topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic,
partitionMetadata.partitionId, partitionMetadata.toString()))
- case ErrorMapping.ReplicaNotAvailableCode =>
+ case Errors.REPLICA_NOT_AVAILABLE =>
// this error message means some replica other than the leader is not available. The consumer
// doesn't care about non leader replicas, so ignore this
topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic,
partitionMetadata.partitionId, partitionMetadata.toString()))
- case _ =>
+ case error: Errors =>
topicMetadataInfo.append("\nMetadata for partition [%s,%d] is not available due to %s".format(topic,
- partitionMetadata.partitionId, ErrorMapping.exceptionFor(partitionMetadata.errorCode).getClass.getName))
+ partitionMetadata.partitionId, error.exception.getClass.getName))
}
}
- case _ =>
+ case error: Errors =>
topicMetadataInfo.append("\nNo partition metadata for topic %s due to %s".format(topic,
- ErrorMapping.exceptionFor(errorCode).getClass.getName))
+ error.exception.getClass.getName))
}
topicMetadataInfo.append("}")
topicMetadataInfo.toString()
@@ -108,16 +109,16 @@ object PartitionMetadata {
}
}
-case class PartitionMetadata(partitionId: Int,
+case class PartitionMetadata(partitionId: Int,
leader: Option[BrokerEndPoint],
replicas: Seq[BrokerEndPoint],
isr: Seq[BrokerEndPoint] = Seq.empty,
- errorCode: Short = ErrorMapping.NoError) extends Logging {
+ errorCode: Short = Errors.NONE.code) extends Logging {
def sizeInBytes: Int = {
- 2 /* error code */ +
- 4 /* partition id */ +
- 4 /* leader */ +
- 4 + 4 * replicas.size /* replica array */ +
+ 2 /* error code */ +
+ 4 /* partition id */ +
+ 4 /* leader */ +
+ 4 + 4 * replicas.size /* replica array */ +
4 + 4 * isr.size /* isr array */
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 656ff9f..be13586 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,11 +20,10 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
-import kafka.common.ErrorMapping
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import scala.collection.mutable.ListBuffer
@@ -80,7 +79,7 @@ case class TopicMetadataRequest(versionId: Short,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val topicMetadata = topics.map {
- topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ topic => TopicMetadata(topic, Nil, Errors.forException(e).code)
}
val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 059c03e..f761125 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,10 +20,10 @@ import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
+import kafka.common.{KafkaException, TopicAndPartition}
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import scala.collection.Set
@@ -127,7 +127,7 @@ case class UpdateMetadataRequest (versionId: Short,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ val errorResponse = new UpdateMetadataResponse(correlationId, Errors.forException(e).code)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
@@ -144,4 +144,4 @@ case class UpdateMetadataRequest (versionId: Short,
updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
updateMetadataRequest.toString()
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
index 53f6067..3bdb3ca 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -17,12 +17,8 @@
package kafka.api
-import kafka.common.{TopicAndPartition, ErrorMapping}
import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import collection.mutable.HashMap
-import collection.Map
-
+import org.apache.kafka.common.protocol.Errors
object UpdateMetadataResponse {
def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
@@ -33,7 +29,7 @@ object UpdateMetadataResponse {
}
case class UpdateMetadataResponse(correlationId: Int,
- errorCode: Short = ErrorMapping.NoError)
+ errorCode: Short = Errors.NONE.code)
extends RequestOrResponse() {
def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
@@ -43,4 +39,4 @@ case class UpdateMetadataResponse(correlationId: Int,
}
override def describe(details: Boolean):String = { toString }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 2f836c0..2093749 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -16,19 +16,18 @@
*/
package kafka.client
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import scala.collection._
import kafka.cluster._
import kafka.api._
import kafka.producer._
-import kafka.common.{ErrorMapping, KafkaException}
+import kafka.common.KafkaException
import kafka.utils.{CoreUtils, Logging}
import java.util.Properties
import util.Random
import kafka.network.BlockingChannel
import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
import java.io.IOException
/**
@@ -95,7 +94,7 @@ object ClientUtils extends Logging{
}
/**
- * Parse a list of broker urls in the form host1:port1, host2:port2, ...
+ * Parse a list of broker urls in the form host1:port1, host2:port2, ...
*/
def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = {
val brokersStr = CoreUtils.parseCsvList(brokerListStr)
@@ -155,7 +154,7 @@ object ClientUtils extends Logging{
val response = queryChannel.receive()
val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload())
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
- if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
+ if (consumerMetadataResponse.errorCode == Errors.NONE.code)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
else {
debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds."
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 916f3e7..1bfb144 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -29,8 +29,11 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
+
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
@@ -325,14 +328,14 @@ class Partition(val topic: String,
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curInSyncReplicas.size) {
- (true, ErrorMapping.NoError)
+ (true, Errors.NONE.code)
} else {
- (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
+ (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
}
} else
- (false, ErrorMapping.NoError)
+ (false, Errors.NONE.code)
case None =>
- (false, ErrorMapping.NotLeaderForPartitionCode)
+ (false, Errors.NOT_LEADER_FOR_PARTITION.code)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index e0ebe94..e20b88c 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -69,8 +69,8 @@ object ErrorMapping {
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
- classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
+ classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index a94e58c..4d9ae40 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -70,7 +70,7 @@ object OffsetMetadataAndError {
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
- def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError)
+ def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE.code)
def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 982955e..55d2bdb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -20,7 +20,6 @@ package kafka.common
import util.matching.Regex
import kafka.coordinator.GroupCoordinator
-
object Topic {
val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 255
@@ -30,17 +29,17 @@ object Topic {
def validate(topic: String) {
if (topic.length <= 0)
- throw new InvalidTopicException("topic name is illegal, can't be empty")
+ throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty")
else if (topic.equals(".") || topic.equals(".."))
- throw new InvalidTopicException("topic name cannot be \".\" or \"..\"")
+ throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\"")
else if (topic.length > maxNameLength)
- throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+ throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
rgx.findFirstIn(topic) match {
case Some(t) =>
if (!t.equals(topic))
- throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
- case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+ throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+ case None => throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 6fa410a..f776578 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -36,7 +36,8 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.ZkUtils._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.Watcher.Event.KeeperState
@@ -274,7 +275,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
"timestamp" -> timestamp))
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
- consumerRegistryDir + "/" + consumerIdString,
+ consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo,
zkUtils.zkConnection.getZookeeper,
false)
@@ -356,23 +357,23 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) =>
- if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) {
+ if (errorCode == Errors.NONE.code && config.dualCommitEnabled) {
val offset = offsetsToCommit(topicPartition).offset
commitOffsetToZooKeeper(topicPartition, offset)
}
(folded._1 || // update commitFailed
- errorCode != ErrorMapping.NoError,
+ errorCode != Errors.NONE.code,
folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
- (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
+ (errorCode != Errors.NONE.code && errorCode != Errors.OFFSET_METADATA_TOO_LARGE.code),
folded._3 || // update shouldRefreshCoordinator
- errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
- errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
+ errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code ||
+ errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code,
// update error count
- folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
+ folded._4 + (if (errorCode != Errors.NONE.code) 1 else 0))
}
}
debug(errorCount + " errors in offset commit response.")
@@ -442,8 +443,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val (leaderChanged, loadInProgress) =
offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
- (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode),
- folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode))
+ (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
+ folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
}
if (leaderChanged) {
@@ -463,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
- (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError))
+ (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE.code))
}
Some(OffsetFetchResponse(mostRecentOffsets))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e7eef36..103f6cf 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -18,6 +18,7 @@ package kafka.controller
import java.util
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 00f3275..79e318c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -190,23 +190,23 @@ class GroupMetadataManager(val brokerId: Int,
val status = responseStatus(groupMetadataPartition)
var responseCode = Errors.NONE.code
- if (status.error != ErrorMapping.NoError) {
+ if (status.error != Errors.NONE.code) {
debug("Metadata from group %s with generation %d failed when appending to log due to %s"
- .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+ .format(group.groupId, generationId, Errors.forCode(status.error).exception.getClass.getName))
// transform the log append error code to the corresponding the commit status error code
- responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) {
+ responseCode = if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
- } else if (status.error == ErrorMapping.NotLeaderForPartitionCode) {
+ } else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) {
Errors.NOT_COORDINATOR_FOR_GROUP.code
- } else if (status.error == ErrorMapping.RequestTimedOutCode) {
+ } else if (status.error == Errors.REQUEST_TIMED_OUT.code) {
Errors.REBALANCE_IN_PROGRESS.code
- } else if (status.error == ErrorMapping.MessageSizeTooLargeCode
- || status.error == ErrorMapping.MessageSetSizeTooLargeCode
- || status.error == ErrorMapping.InvalidFetchSizeCode) {
+ } else if (status.error == Errors.MESSAGE_TOO_LARGE.code
+ || status.error == Errors.RECORD_LIST_TOO_LARGE.code
+ || status.error == Errors.INVALID_FETCH_SIZE.code) {
error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
- .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+ .format(group.groupId, generationId, Errors.forCode(status.error).exception.getClass.getName))
Errors.UNKNOWN.code
} else {
@@ -271,23 +271,23 @@ class GroupMetadataManager(val brokerId: Int,
val status = responseStatus(offsetTopicPartition)
val responseCode =
- if (status.error == ErrorMapping.NoError) {
+ if (status.error == Errors.NONE.code) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
}
- ErrorMapping.NoError
+ Errors.NONE.code
} else {
debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
- .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+ .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.error).exception.getClass.getName))
// transform the log append error code to the corresponding the commit status error code
- if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
- ErrorMapping.ConsumerCoordinatorNotAvailableCode
- else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
- ErrorMapping.NotCoordinatorForConsumerCode
- else if (status.error == ErrorMapping.MessageSizeTooLargeCode
- || status.error == ErrorMapping.MessageSetSizeTooLargeCode
- || status.error == ErrorMapping.InvalidFetchSizeCode)
+ if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
+ else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code)
+ Errors.NOT_COORDINATOR_FOR_GROUP.code
+ else if (status.error == Errors.MESSAGE_TOO_LARGE.code
+ || status.error == Errors.RECORD_LIST_TOO_LARGE.code
+ || status.error == Errors.INVALID_FETCH_SIZE.code)
Errors.INVALID_COMMIT_OFFSET_SIZE.code
else
status.error
@@ -299,7 +299,7 @@ class GroupMetadataManager(val brokerId: Int,
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicAndPartition, responseCode)
else
- (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+ (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
}
// finally trigger the callback logic passed from the API layer
@@ -320,7 +320,7 @@ class GroupMetadataManager(val brokerId: Int,
if (topicPartitions.isEmpty) {
// Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
- (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
+ (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
}.toMap
} else {
topicPartitions.map { topicAndPartition =>
@@ -516,7 +516,7 @@ class GroupMetadataManager(val brokerId: Int,
if (offsetAndMetadata == null)
OffsetMetadataAndError.NoOffset
else
- OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
+ OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
}
/**