You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 10:46:06 UTC

kafka git commit: MINOR: additional refactoring around the use of Errors

Repository: kafka
Updated Branches:
  refs/heads/trunk 5b013d9cd -> f111f2a71


MINOR: additional refactoring around the use of Errors

A couple of updates were missed in the [PR](https://github.com/apache/kafka/pull/2475) that replaced the use of error codes with Errors objects.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2635 from vahidhashemian/minor/Errors_refactoring_leftover


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f111f2a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f111f2a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f111f2a7

Branch: refs/heads/trunk
Commit: f111f2a7167f3abcb0b0e53ac22f9f7bb367766e
Parents: 5b013d9
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Mon Mar 6 10:46:04 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 10:46:04 2017 +0000

----------------------------------------------------------------------
 .../kafka/common/OffsetMetadataAndError.scala   |  2 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  4 +--
 .../kafka/server/ReplicaFetcherThread.scala     |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 30 ++++++++++----------
 4 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 46c0881..e0aa46d 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -58,7 +58,7 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors
 
   def metadata = offsetMetadata.metadata
 
-  override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error)
+  override def toString = "[%s, Error=%s]".format(offsetMetadata, error)
 }
 
 object OffsetMetadataAndError {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 891896a..1735dc8 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -474,8 +474,8 @@ class GroupCoordinator(val brokerId: Int,
     if (!isActive.get) {
       (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
-      val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
-      (errorCode, groupManager.currentGroups.map(_.overview).toList)
+      val error = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+      (error, groupManager.currentGroups.map(_.overview).toList)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7fb02a3..3b6adec 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -278,7 +278,7 @@ class ReplicaFetcherThread(name: String,
           partitionData.offset
         else
           partitionData.offsets.get(0)
-      case errorCode => throw errorCode.exception
+      case error => throw error.exception
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f111f2a7/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3368c09..22efa1f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -291,20 +291,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
       val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
 
       val resourceToAcls = RequestKeysToAcls(key)
       resourceToAcls.get(topicResource).map { acls =>
         val describeAcls = TopicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
-        sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
+        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
         removeAllAcls
       }
 
       for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
     }
   }
 
@@ -330,20 +330,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
       val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
 
       val resourceToAcls = RequestKeysToAcls(key)
       resourceToAcls.get(topicResource).map { acls =>
         val describeAcls = TopicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
-        sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
+        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
         removeAllAcls
       }
 
       for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
     }
   }
 
@@ -805,18 +805,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
-  def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys,
-                                            request: AbstractRequest,
-                                            resources: Set[ResourceType],
-                                            isAuthorized: Boolean,
-                                            isAuthorizedTopicDescribe: Boolean,
-                                            topicExists: Boolean = true): AbstractResponse = {
+  def sendRequestAndVerifyResponseError(apiKey: ApiKeys,
+                                        request: AbstractRequest,
+                                        resources: Set[ResourceType],
+                                        isAuthorized: Boolean,
+                                        isAuthorizedTopicDescribe: Boolean,
+                                        topicExists: Boolean = true): AbstractResponse = {
     val resp = connectAndSend(request, apiKey)
     val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
       null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
     val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
 
-    val authorizationErrorCodes = resources.flatMap { resourceType =>
+    val authorizationErrors = resources.flatMap { resourceType =>
       if (resourceType == Topic) {
         if (isAuthorized)
           Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
@@ -831,9 +831,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     if (topicExists)
       if (isAuthorized)
-        assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrorCodes.contains(error))
+        assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrors.contains(error))
       else
-        assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrorCodes", authorizationErrorCodes.contains(error))
+        assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
     else if (resources == Set(Topic))
       assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
     else