You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 10:46:06 UTC
kafka git commit: MINOR: additional refactoring around the use of
Errors
Repository: kafka
Updated Branches:
refs/heads/trunk 5b013d9cd -> f111f2a71
MINOR: additional refactoring around the use of Errors
A couple of updates were missed in the [PR](https://github.com/apache/kafka/pull/2475) that replaced the use of error codes with Errors objects.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2635 from vahidhashemian/minor/Errors_refactoring_leftover
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f111f2a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f111f2a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f111f2a7
Branch: refs/heads/trunk
Commit: f111f2a7167f3abcb0b0e53ac22f9f7bb367766e
Parents: 5b013d9
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Mon Mar 6 10:46:04 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 10:46:04 2017 +0000
----------------------------------------------------------------------
.../kafka/common/OffsetMetadataAndError.scala | 2 +-
.../kafka/coordinator/GroupCoordinator.scala | 4 +--
.../kafka/server/ReplicaFetcherThread.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 30 ++++++++++----------
4 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 46c0881..e0aa46d 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -58,7 +58,7 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors
def metadata = offsetMetadata.metadata
- override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error)
+ override def toString = "[%s, Error=%s]".format(offsetMetadata, error)
}
object OffsetMetadataAndError {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 891896a..1735dc8 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -474,8 +474,8 @@ class GroupCoordinator(val brokerId: Int,
if (!isActive.get) {
(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
} else {
- val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
- (errorCode, groupManager.currentGroups.map(_.overview).toList)
+ val error = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+ (error, groupManager.currentGroups.map(_.overview).toList)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7fb02a3..3b6adec 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -278,7 +278,7 @@ class ReplicaFetcherThread(name: String,
partitionData.offset
else
partitionData.offsets.get(0)
- case errorCode => throw errorCode.exception
+ case error => throw error.exception
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3368c09..22efa1f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -291,20 +291,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
for ((key, request) <- requestKeyToRequest) {
removeAllAcls
val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
val resourceToAcls = RequestKeysToAcls(key)
resourceToAcls.get(topicResource).map { acls =>
val describeAcls = TopicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
removeAllAcls
}
for ((resource, acls) <- resourceToAcls)
addAndVerifyAcls(acls, resource)
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
}
}
@@ -330,20 +330,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
for ((key, request) <- requestKeyToRequest) {
removeAllAcls
val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
val resourceToAcls = RequestKeysToAcls(key)
resourceToAcls.get(topicResource).map { acls =>
val describeAcls = TopicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
removeAllAcls
}
for ((resource, acls) <- resourceToAcls)
addAndVerifyAcls(acls, resource)
- sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
}
}
@@ -805,18 +805,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
- def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys,
- request: AbstractRequest,
- resources: Set[ResourceType],
- isAuthorized: Boolean,
- isAuthorizedTopicDescribe: Boolean,
- topicExists: Boolean = true): AbstractResponse = {
+ def sendRequestAndVerifyResponseError(apiKey: ApiKeys,
+ request: AbstractRequest,
+ resources: Set[ResourceType],
+ isAuthorized: Boolean,
+ isAuthorizedTopicDescribe: Boolean,
+ topicExists: Boolean = true): AbstractResponse = {
val resp = connectAndSend(request, apiKey)
val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
- val authorizationErrorCodes = resources.flatMap { resourceType =>
+ val authorizationErrors = resources.flatMap { resourceType =>
if (resourceType == Topic) {
if (isAuthorized)
Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
@@ -831,9 +831,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
if (topicExists)
if (isAuthorized)
- assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrorCodes.contains(error))
+ assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrors.contains(error))
else
- assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrorCodes", authorizationErrorCodes.contains(error))
+ assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
else if (resources == Set(Topic))
assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
else