You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/17 13:04:45 UTC
[1/2] kafka git commit: MINOR: A few cleanups in KafkaApis and
TransactionMarkerChannelManager
Repository: kafka
Updated Branches:
refs/heads/0.11.0 b1313935f -> bc6a3bc6f
MINOR: A few cleanups in KafkaApis and TransactionMarkerChannelManager
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Colin P. Mccabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3171 from hachikuji/minor-txn-channel-cleanups
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63605779
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63605779
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63605779
Branch: refs/heads/0.11.0
Commit: 63605779ef67fb39487cf0487c8ac4caa8d39cbc
Parents: b131393
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Jun 17 02:03:25 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 14:04:32 2017 +0100
----------------------------------------------------------------------
.../common/requests/DeleteAclsResponse.java | 8 +
.../kafka/common/InterBrokerSendThread.scala | 29 +--
.../TransactionMarkerChannelManager.scala | 22 +--
.../scala/kafka/security/SecurityUtils.scala | 48 +++++
.../kafka/security/auth/PermissionType.scala | 5 -
.../src/main/scala/kafka/server/KafkaApis.scala | 187 ++++++-------------
.../common/InterBrokerSendThreadTest.scala | 16 +-
.../TransactionMarkerChannelManagerTest.scala | 20 +-
8 files changed, 151 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 796e200..94cd6aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -49,6 +49,10 @@ public class DeleteAclsResponse extends AbstractResponse {
this.acl = acl;
}
+ public AclDeletionResult(AclBinding acl) {
+ this(null, acl);
+ }
+
public ApiException exception() {
return exception;
}
@@ -72,6 +76,10 @@ public class DeleteAclsResponse extends AbstractResponse {
this.deletions = deletions;
}
+ public AclFilterResponse(Collection<AclDeletionResult> deletions) {
+ this(null, deletions);
+ }
+
public Throwable throwable() {
return throwable;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 886e41c..06158b2 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -27,24 +27,27 @@ import org.apache.kafka.common.utils.Time
/**
* Class for inter-broker send thread that utilize a non-blocking network client.
*/
-class InterBrokerSendThread(name: String,
- networkClient: NetworkClient,
- requestGenerator: () => Iterable[RequestAndCompletionHandler],
- time: Time,
- isInterruptible: Boolean = true)
+abstract class InterBrokerSendThread(name: String,
+ networkClient: NetworkClient,
+ time: Time,
+ isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
- // visible for testing
- def generateRequests(): Iterable[RequestAndCompletionHandler] = requestGenerator()
+ def generateRequests(): Iterable[RequestAndCompletionHandler]
+
+ override def shutdown(): Unit = {
+ initiateShutdown()
+ // wake up the thread in case it is blocked inside poll
+ networkClient.wakeup()
+ awaitShutdown()
+ }
override def doWork() {
val now = time.milliseconds()
var pollTimeout = Long.MaxValue
try {
- val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator()
-
- for (request: RequestAndCompletionHandler <- requestsToSend) {
+ for (request: RequestAndCompletionHandler <- generateRequests()) {
val destination = Integer.toString(request.destination.id())
val completionHandler = request.handler
val clientRequest = networkClient.newClientRequest(destination,
@@ -79,6 +82,10 @@ class InterBrokerSendThread(name: String,
throw new FatalExitError()
}
}
+
+ def wakeup(): Unit = networkClient.wakeup()
+
}
-case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler)
\ No newline at end of file
+case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest],
+ handler: RequestCompletionHandler)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 22f01c1..9c3ffd9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -122,16 +122,12 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
- time: Time) extends Logging with KafkaMetricsGroup {
+ time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup {
this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "
private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
- private val txnMarkerSendThread: InterBrokerSendThread = {
- new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
- }
-
private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
@@ -152,15 +148,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
)
- def start(): Unit = {
- txnMarkerSendThread.start()
- }
+ override def generateRequests() = drainQueuedTransactionMarkers()
- def shutdown(): Unit = {
- txnMarkerSendThread.initiateShutdown()
- // wake up the thread in case it is blocked inside poll
- networkClient.wakeup()
- txnMarkerSendThread.awaitShutdown()
+ override def shutdown(): Unit = {
+ super.shutdown()
txnMarkerPurgatory.shutdown()
markersQueuePerBroker.clear()
}
@@ -173,9 +164,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
// visible for testing
private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker
- // visible for testing
- private[transaction] def senderThread = txnMarkerSendThread
-
private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) {
val brokerId = broker.id
@@ -369,7 +357,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
}
- networkClient.wakeup()
+ wakeup()
}
def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
new file mode 100644
index 0000000..bbfc42c
--- /dev/null
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security
+
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
+import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+import scala.util.Try
+
+
+object SecurityUtils {
+
+ def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = {
+ for {
+ resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
+ principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+ operation <- Try(Operation.fromJava(filter.entryFilter.operation))
+ permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
+ resource = Resource(resourceType, filter.resourceFilter.name)
+ acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
+ } yield (resource, acl)
+ }
+
+ def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
+ val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
+ val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
+ acl.operation.toJava, acl.permissionType.toJava)
+ new AclBinding(adminResource, entry)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index 686c60b..c603351 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -21,11 +21,6 @@ import org.apache.kafka.common.acl.AclPermissionType
import scala.util.{Failure, Success, Try}
-/**
- * PermissionType.
- */
-
-
sealed trait PermissionType extends BaseEnum {
val toJava: AclPermissionType
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 27eb816..337c740 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.{RequestChannel, RequestOrResponseSend}
+import kafka.security.SecurityUtils
import kafka.security.auth._
import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors._
@@ -50,13 +51,12 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import scala.collection._
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
/**
@@ -1782,70 +1782,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- /**
- * Convert an ACL binding filter to a Scala object.
- * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are allowed.)
- *
- * @param filter The binding filter as a Java object.
- * @return The binding filter as a scala object, or an exception if there was an error
- * converting the Java object.
- */
- def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = {
- filter.resourceFilter().resourceType() match {
- case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN resource type"))
- case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type"))
- case _ => {}
- }
- val resourceType: ResourceType = try {
- ResourceType.fromJava(filter.resourceFilter.resourceType)
- } catch {
- case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type"))
- }
- val principal: KafkaPrincipal = try {
- KafkaPrincipal.fromString(filter.entryFilter.principal)
- } catch {
- case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal"))
- }
- filter.entryFilter().operation() match {
- case AclOperation.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN operation type"))
- case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type"))
- case _ => {}
- }
- val operation: Operation = try {
- Operation.fromJava(filter.entryFilter.operation)
- } catch {
- case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
- }
- filter.entryFilter().permissionType() match {
- case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type")
- case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type")
- case _ => {}
- }
- val permissionType: PermissionType = try {
- PermissionType.fromJava(filter.entryFilter.permissionType)
- } catch {
- case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
- }
- return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType,
- filter.entryFilter().host(), operation)))
- }
-
- /**
- * Convert a Scala ACL binding to a Java object.
- *
- * @param acl The binding as a Scala object.
- * @return The binding as a Java object.
- */
- def toJava(acl: (Resource, Acl)) : AclBinding = {
- acl match {
- case (resource, acl) =>
- val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
- val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
- acl.operation.toJava, acl.permissionType.toJava)
- return new AclBinding(adminResource, entry)
- }
- }
-
def handleCreateAcls(request: RequestChannel.Request): Unit = {
authorizeClusterAlter(request)
val createAclsRequest = request.body[CreateAclsRequest]
@@ -1855,11 +1791,9 @@ class KafkaApis(val requestChannel: RequestChannel,
createAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured on the broker.")))
case Some(auth) =>
- val errors = mutable.HashMap[Int, Throwable]()
- for (i <- 0 until createAclsRequest.aclCreations.size) {
- val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
- result match {
- case Failure(throwable) => errors.put(i, throwable)
+ val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation =>
+ SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match {
+ case Failure(throwable) => new AclCreationResponse(throwable)
case Success((resource, acl)) => try {
if (resource.resourceType.equals(Cluster) &&
!resource.name.equals(Resource.ClusterResourceName))
@@ -1868,25 +1802,19 @@ class KafkaApis(val requestChannel: RequestChannel,
if (resource.name.isEmpty)
throw new InvalidRequestException("Invalid empty resource name")
auth.addAcls(immutable.Set(acl), resource)
- if (logger.isDebugEnabled)
- logger.debug(s"Added acl $acl to $resource")
+
+ logger.debug(s"Added acl $acl to $resource")
+
+ new AclCreationResponse(null)
} catch {
- case throwable : Throwable => if (logger.isDebugEnabled) {
- logger.debug(s"Failed to add acl $acl to $resource", throwable)
- }
- errors.put(i, throwable)
+ case throwable: Throwable =>
+ logger.debug(s"Failed to add acl $acl to $resource", throwable)
+ new AclCreationResponse(throwable)
}
}
}
- val aclCreationResults = new java.util.ArrayList[AclCreationResponse]
- for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
- errors.get(i) match {
- case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable))
- case None => aclCreationResults.add(new AclCreationResponse(null))
- }
- }
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new CreateAclsResponse(requestThrottleMs, aclCreationResults))
+ new CreateAclsResponse(requestThrottleMs, aclCreationResults.asJava))
}
}
@@ -1899,60 +1827,53 @@ class KafkaApis(val requestChannel: RequestChannel,
deleteAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured on the broker.")))
case Some(auth) =>
- val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
- val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
- for (i <- 0 to deleteAclsRequest.filters().size - 1) {
- toDelete.put(i, new ListBuffer[(Resource, Acl)]())
- }
- if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) {
- // Delete based on filters that may match more than one ACL.
- val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
- aclMap.foreach { case (resource, acls) =>
- acls.foreach { acl =>
- val binding = new AclBinding(new AdminResource(AdminResourceType.
- fromString(resource.resourceType.toString), resource.name),
- new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
- acl.operation.toJava, acl.permissionType.toJava))
- for (i <- 0 to deleteAclsRequest.filters().size - 1) {
- val filter = deleteAclsRequest.filters().get(i)
- if (filter.matches(binding)) {
- toDelete.get(i).get += ((resource, acl))
- }
- }
+ val filters = deleteAclsRequest.filters.asScala
+ val filterResponseMap = mutable.Map[Int, AclFilterResponse]()
+ val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
+
+ if (filters.forall(_.matchesAtMostOne)) {
+ // Delete based on a list of ACL fixtures.
+ for ((filter, i) <- filters.zipWithIndex) {
+ SecurityUtils.convertToResourceAndAcl(filter) match {
+ case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava))
+ case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
}
}
} else {
- // Delete based on a list of ACL fixtures.
- for (i <- 0 to deleteAclsRequest.filters().size - 1) {
- toScala(deleteAclsRequest.filters().get(i)) match {
- case Failure(throwable) => filterResponseMap.put(i,
- new AclFilterResponse(throwable, Collections.emptySet[AclDeletionResult]()))
- case Success(fixture) => toDelete.put(i, ListBuffer(fixture))
- }
+ // Delete based on filters that may match more than one ACL.
+ val aclMap = auth.getAcls()
+ val filtersWithIndex = filters.zipWithIndex
+ for ((resource, acls) <- aclMap; acl <- acls) {
+ val binding = new AclBinding(
+ new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+ new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
+ acl.permissionType.toJava))
+
+ for ((filter, i) <- filtersWithIndex if filter.matches(binding))
+ toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl))
}
}
- for (i <- toDelete.keys) {
- val deletionResults = new util.ArrayList[AclDeletionResult]()
- for (acls <- toDelete.get(i)) {
- for ((resource, acl) <- acls) {
- try {
- if (auth.removeAcls(immutable.Set(acl), resource)) {
- deletionResults.add(new AclDeletionResult(null, toJava((resource, acl))))
- }
- } catch {
- case throwable: Throwable => deletionResults.add(new AclDeletionResult(
- new UnknownServerException("Failed to delete ACL: " + throwable.toString),
- toJava((resource, acl))))
- }
+
+ for ((i, acls) <- toDelete) {
+ val deletionResults = acls.flatMap { case (resource, acl) =>
+ val aclBinding = SecurityUtils.convertToAclBinding(resource, acl)
+ try {
+ if (auth.removeAcls(immutable.Set(acl), resource))
+ Some(new AclDeletionResult(aclBinding))
+ else None
+ } catch {
+ case throwable: Throwable =>
+ Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"),
+ aclBinding))
}
- }
- filterResponseMap.put(i, new AclFilterResponse(null, deletionResults))
- }
- val filterResponses = new util.ArrayList[AclFilterResponse]
- for (i <- 0 to deleteAclsRequest.filters().size() - 1) {
- filterResponses.add(filterResponseMap.getOrElse(i,
- new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]())))
+ }.asJava
+
+ filterResponseMap.put(i, new AclFilterResponse(deletionResults))
}
+
+ val filterResponses = filters.indices.map { i =>
+ filterResponseMap.getOrElse(i, new AclFilterResponse(Seq.empty.asJava))
+ }.asJava
sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index a508c41..c6ebdd1 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -34,7 +34,9 @@ class InterBrokerSendThreadTest {
@Test
def shouldNotSendAnythingWhenNoRequests(): Unit = {
- val sendThread = new InterBrokerSendThread("name", networkClient, () => mutable.Iterable.empty, time)
+ val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override def generateRequests() = mutable.Iterable.empty
+ }
// poll is always called but there should be no further invocations on NetworkClient
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
@@ -52,9 +54,9 @@ class InterBrokerSendThreadTest {
val request = new StubRequestBuilder()
val node = new Node(1, "", 8080)
val handler = RequestAndCompletionHandler(node, request, completionHandler)
- val sendThread = new InterBrokerSendThread("name", networkClient, () => {
- List[RequestAndCompletionHandler](handler)
- }, time)
+ val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override def generateRequests() = List[RequestAndCompletionHandler](handler)
+ }
val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, handler.handler)
@@ -86,9 +88,9 @@ class InterBrokerSendThreadTest {
val request = new StubRequestBuilder
val node = new Node(1, "", 8080)
val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
- val sendThread = new InterBrokerSendThread("name", networkClient, () => {
- List[RequestAndCompletionHandler](requestAndCompletionHandler)
- }, time)
+ val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
+ }
val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestAndCompletionHandler.handler)
http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 01a350b..9e7fb13 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -73,8 +73,6 @@ class TransactionMarkerChannelManagerTest {
txnMarkerPurgatory,
time)
- private val senderThread = channelManager.senderThread
-
private def mockCache(): Unit = {
EasyMock.expect(txnStateManager.partitionFor(transactionalId1))
.andReturn(txnTopicPartition1)
@@ -93,7 +91,7 @@ class TransactionMarkerChannelManagerTest {
@Test
def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = {
- assertTrue(senderThread.generateRequests().isEmpty)
+ assertTrue(channelManager.generateRequests().isEmpty)
}
@Test
@@ -131,12 +129,12 @@ class TransactionMarkerChannelManagerTest {
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
- val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+ val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
}.toMap
assertEquals(Map(broker1 -> expectedBroker1Request, broker2 -> expectedBroker2Request), requests)
- assertTrue(senderThread.generateRequests().isEmpty)
+ assertTrue(channelManager.generateRequests().isEmpty)
}
@Test
@@ -208,13 +206,13 @@ class TransactionMarkerChannelManagerTest {
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
- val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+ val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
}.toMap
assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests)
- val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+ val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
(handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
}.toMap
@@ -294,7 +292,7 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
- val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+ val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
for (requestAndHandler <- requestAndHandlers) {
@@ -342,7 +340,7 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
- val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+ val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
for (requestAndHandler <- requestAndHandlers) {
@@ -396,7 +394,7 @@ class TransactionMarkerChannelManagerTest {
channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
- val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+ val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
for (requestAndHandler <- requestAndHandlers) {
@@ -404,7 +402,7 @@ class TransactionMarkerChannelManagerTest {
}
// call this again so that append log will be retried
- senderThread.generateRequests()
+ channelManager.generateRequests()
EasyMock.verify(txnStateManager)
[2/2] kafka git commit: HOTFIX: Improve error handling for ACL
requests
Posted by ij...@apache.org.
HOTFIX: Improve error handling for ACL requests
- Use ResourceType.toJava instead of ResourceType.fromString. The latter
doesn't work for TransactionalId (or any type with two camel-case
words).
- Replace Throwable with ApiError in response classes.
- Return InvalidRequest instead of Unknown error if ANY or UNKNOWN
are provided during ACL creation.
- Rename `unknown()` to `isUnknown()` in a few places that
were missed previously.
- Add tests.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3364 from ijuma/acls-fromString-fixes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc6a3bc6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc6a3bc6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc6a3bc6
Branch: refs/heads/0.11.0
Commit: bc6a3bc6f8c74df5437005f0c1da7822fa10ed2b
Parents: 6360577
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Jun 17 14:02:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 14:04:37 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/admin/KafkaAdminClient.java | 18 ++--
.../org/apache/kafka/common/acl/AclBinding.java | 2 +-
.../kafka/common/acl/AclBindingFilter.java | 4 +-
.../apache/kafka/common/requests/ApiError.java | 10 ++
.../common/requests/CreateAclsRequest.java | 5 +-
.../common/requests/CreateAclsResponse.java | 33 ++----
.../common/requests/DeleteAclsRequest.java | 2 +-
.../common/requests/DeleteAclsResponse.java | 95 ++++++------------
.../common/requests/DescribeAclsRequest.java | 3 +-
.../common/requests/DescribeAclsResponse.java | 50 ++++------
.../kafka/common/resource/ResourceFilter.java | 2 +-
.../clients/admin/KafkaAdminClientTest.java | 100 ++++++-------------
.../apache/kafka/common/acl/AclBindingTest.java | 14 +--
.../common/requests/RequestResponseTest.java | 31 +++---
.../scala/kafka/security/SecurityUtils.scala | 17 ++--
.../main/scala/kafka/server/AdminManager.scala | 8 +-
.../kafka/server/DelayedCreateTopics.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 36 +++----
.../kafka/api/AuthorizerIntegrationTest.scala | 96 ++++++++++--------
.../api/SaslSslAdminClientIntegrationTest.scala | 32 ++++--
.../AbstractCreateTopicsRequestTest.scala | 4 +-
.../processor/internals/StreamsKafkaClient.java | 2 +-
22 files changed, 251 insertions(+), 317 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e92b1d3..881f8d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1277,8 +1277,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
DescribeAclsResponse response = (DescribeAclsResponse) abstractResponse;
- if (response.throwable() != null) {
- future.completeExceptionally(response.throwable());
+ if (response.error().isFailure()) {
+ future.completeExceptionally(response.error().exception());
} else {
future.complete(response.acls());
}
@@ -1330,8 +1330,8 @@ public class KafkaAdminClient extends AdminClient {
"The broker reported no creation result for the given ACL."));
} else {
AclCreationResponse creation = iter.next();
- if (creation.throwable() != null) {
- future.completeExceptionally(creation.throwable());
+ if (creation.error().isFailure()) {
+ future.completeExceptionally(creation.error().exception());
} else {
future.complete(null);
}
@@ -1378,12 +1378,12 @@ public class KafkaAdminClient extends AdminClient {
"The broker reported no deletion result for the given filter."));
} else {
AclFilterResponse deletion = iter.next();
- if (deletion.throwable() != null) {
- future.completeExceptionally(deletion.throwable());
+ if (deletion.error().isFailure()) {
+ future.completeExceptionally(deletion.error().exception());
} else {
List<FilterResult> filterResults = new ArrayList<>();
for (AclDeletionResult deletionResult : deletion.deletions()) {
- filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.exception()));
+ filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.error().exception()));
}
future.complete(new FilterResults(filterResults));
}
@@ -1433,7 +1433,7 @@ public class KafkaAdminClient extends AdminClient {
ConfigResource configResource = entry.getKey();
KafkaFutureImpl<Config> future = entry.getValue();
DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
- if (!config.error().is(Errors.NONE)) {
+ if (config.error().isFailure()) {
future.completeExceptionally(config.error().exception());
continue;
}
@@ -1469,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient {
DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
DescribeConfigsResponse.Config config = response.configs().get(resource);
- if (!config.error().is(Errors.NONE))
+ if (config.error().isFailure())
brokerFuture.completeExceptionally(config.error().exception());
else {
List<ConfigEntry> configEntries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index ea58434..d264ef1 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -48,7 +48,7 @@ public class AclBinding {
/**
* Return true if this binding has any UNKNOWN components.
*/
- public boolean unknown() {
+ public boolean isUnknown() {
return resource.isUnknown() || entry.isUnknown();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 807b730..64f16cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -56,8 +56,8 @@ public class AclBindingFilter {
/**
* Return true if this filter has any UNKNOWN components.
*/
- public boolean unknown() {
- return resourceFilter.unknown() || entryFilter.isUnknown();
+ public boolean isUnknown() {
+ return resourceFilter.isUnknown() || entryFilter.isUnknown();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 26034eb..d712123 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.protocol.types.Struct;
*/
public class ApiError {
+ public static final ApiError NONE = new ApiError(Errors.NONE, null);
+
private static final String CODE_KEY_NAME = "error_code";
private static final String MESSAGE_KEY_NAME = "error_message";
@@ -67,6 +69,14 @@ public class ApiError {
return this.error == error;
}
+ public boolean isFailure() {
+ return !isSuccess();
+ }
+
+ public boolean isSuccess() {
+ return is(Errors.NONE);
+ }
+
public Errors error() {
return error;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 757b5af..3598d4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -117,9 +117,8 @@ public class CreateAclsRequest extends AbstractRequest {
switch (versionId) {
case 0:
List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
- for (int i = 0; i < aclCreations.size(); i++) {
- responses.add(new CreateAclsResponse.AclCreationResponse(throwable));
- }
+ for (int i = 0; i < aclCreations.size(); i++)
+ responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
return new CreateAclsResponse(throttleTimeMs, responses);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index c84b97c..1fc75da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -26,23 +25,21 @@ import java.util.List;
public class CreateAclsResponse extends AbstractResponse {
private final static String CREATION_RESPONSES = "creation_responses";
- private final static String ERROR_CODE = "error_code";
- private final static String ERROR_MESSAGE = "error_message";
public static class AclCreationResponse {
- private final Throwable throwable;
+ private final ApiError error;
- public AclCreationResponse(Throwable throwable) {
- this.throwable = throwable;
+ public AclCreationResponse(ApiError error) {
+ this.error = error;
}
- public Throwable throwable() {
- return throwable;
+ public ApiError error() {
+ return error;
}
@Override
public String toString() {
- return "(" + throwable + ")";
+ return "(" + error + ")";
}
}
@@ -60,14 +57,8 @@ public class CreateAclsResponse extends AbstractResponse {
this.aclCreationResponses = new ArrayList<>();
for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
Struct responseStruct = (Struct) responseStructObj;
- short errorCode = responseStruct.getShort(ERROR_CODE);
- String errorMessage = responseStruct.getString(ERROR_MESSAGE);
- if (errorCode != 0) {
- this.aclCreationResponses.add(new AclCreationResponse(
- Errors.forCode(errorCode).exception(errorMessage)));
- } else {
- this.aclCreationResponses.add(new AclCreationResponse(null));
- }
+ ApiError error = new ApiError(responseStruct);
+ this.aclCreationResponses.add(new AclCreationResponse(error));
}
}
@@ -78,13 +69,7 @@ public class CreateAclsResponse extends AbstractResponse {
List<Struct> responseStructs = new ArrayList<>();
for (AclCreationResponse response : aclCreationResponses) {
Struct responseStruct = struct.instance(CREATION_RESPONSES);
- if (response.throwable() == null) {
- responseStruct.set(ERROR_CODE, (short) 0);
- } else {
- Errors errors = Errors.forException(response.throwable());
- responseStruct.set(ERROR_CODE, errors.code());
- responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
- }
+ response.error.write(responseStruct);
responseStructs.add(responseStruct);
}
struct.set(CREATION_RESPONSES, responseStructs.toArray());
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 246b5e5..c05bec6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -96,7 +96,7 @@ public class DeleteAclsRequest extends AbstractRequest {
List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
for (int i = 0; i < filters.size(); i++) {
responses.add(new DeleteAclsResponse.AclFilterResponse(
- throwable, Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+ ApiError.fromThrowable(throwable), Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
}
return new DeleteAclsResponse(throttleTimeMs, responses);
default:
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 94cd6aa..973aa8e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,9 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.utils.Utils;
@@ -30,31 +28,28 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
public class DeleteAclsResponse extends AbstractResponse {
public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class);
private final static String FILTER_RESPONSES = "filter_responses";
- private final static String ERROR_CODE = "error_code";
- private final static String ERROR_MESSAGE = "error_message";
private final static String MATCHING_ACLS = "matching_acls";
public static class AclDeletionResult {
- private final ApiException exception;
+ private final ApiError error;
private final AclBinding acl;
- public AclDeletionResult(ApiException exception, AclBinding acl) {
- this.exception = exception;
+ public AclDeletionResult(ApiError error, AclBinding acl) {
+ this.error = error;
this.acl = acl;
}
public AclDeletionResult(AclBinding acl) {
- this(null, acl);
+ this(ApiError.NONE, acl);
}
- public ApiException exception() {
- return exception;
+ public ApiError error() {
+ return error;
}
public AclBinding acl() {
@@ -63,25 +58,25 @@ public class DeleteAclsResponse extends AbstractResponse {
@Override
public String toString() {
- return "(apiException=" + exception + ", acl=" + acl + ")";
+ return "(error=" + error + ", acl=" + acl + ")";
}
}
public static class AclFilterResponse {
- private final Throwable throwable;
+ private final ApiError error;
private final Collection<AclDeletionResult> deletions;
- public AclFilterResponse(Throwable throwable, Collection<AclDeletionResult> deletions) {
- this.throwable = throwable;
+ public AclFilterResponse(ApiError error, Collection<AclDeletionResult> deletions) {
+ this.error = error;
this.deletions = deletions;
}
public AclFilterResponse(Collection<AclDeletionResult> deletions) {
- this(null, deletions);
+ this(ApiError.NONE, deletions);
}
- public Throwable throwable() {
- return throwable;
+ public ApiError error() {
+ return error;
}
public Collection<AclDeletionResult> deletions() {
@@ -90,7 +85,7 @@ public class DeleteAclsResponse extends AbstractResponse {
@Override
public String toString() {
- return "(throwable=" + throwable + ", deletions=" + Utils.join(deletions, ",") + ")";
+ return "(error=" + error + ", deletions=" + Utils.join(deletions, ",") + ")";
}
}
@@ -108,29 +103,16 @@ public class DeleteAclsResponse extends AbstractResponse {
this.responses = new ArrayList<>();
for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
Struct responseStruct = (Struct) responseStructObj;
- short responseErrorCode = responseStruct.getShort(ERROR_CODE);
- String responseErrorMessage = responseStruct.getString(ERROR_MESSAGE);
- if (responseErrorCode != 0) {
- this.responses.add(new AclFilterResponse(
- Errors.forCode(responseErrorCode).exception(responseErrorMessage),
- Collections.<AclDeletionResult>emptySet()));
- } else {
- List<AclDeletionResult> deletions = new ArrayList<>();
- for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) {
- Struct matchingAclStruct = (Struct) matchingAclStructObj;
- short matchErrorCode = matchingAclStruct.getShort(ERROR_CODE);
- ApiException exception = null;
- if (matchErrorCode != 0) {
- Errors errors = Errors.forCode(matchErrorCode);
- String matchErrorMessage = matchingAclStruct.getString(ERROR_MESSAGE);
- exception = errors.exception(matchErrorMessage);
- }
- AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
- Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct);
- deletions.add(new AclDeletionResult(exception, new AclBinding(resource, entry)));
- }
- this.responses.add(new AclFilterResponse(null, deletions));
+ ApiError error = new ApiError(responseStruct);
+ List<AclDeletionResult> deletions = new ArrayList<>();
+ for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) {
+ Struct matchingAclStruct = (Struct) matchingAclStructObj;
+ ApiError matchError = new ApiError(matchingAclStruct);
+ AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
+ Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct);
+ deletions.add(new AclDeletionResult(matchError, new AclBinding(resource, entry)));
}
+ this.responses.add(new AclFilterResponse(error, deletions));
}
}
@@ -141,29 +123,16 @@ public class DeleteAclsResponse extends AbstractResponse {
List<Struct> responseStructs = new ArrayList<>();
for (AclFilterResponse response : responses) {
Struct responseStruct = struct.instance(FILTER_RESPONSES);
- if (response.throwable() != null) {
- Errors error = Errors.forException(response.throwable());
- responseStruct.set(ERROR_CODE, error.code());
- responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
- responseStruct.set(MATCHING_ACLS, new Struct[0]);
- } else {
- responseStruct.set(ERROR_CODE, (short) 0);
- List<Struct> deletionStructs = new ArrayList<>();
- for (AclDeletionResult deletion : response.deletions()) {
- Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
- if (deletion.exception() != null) {
- Errors error = Errors.forException(deletion.exception);
- deletionStruct.set(ERROR_CODE, error.code());
- deletionStruct.set(ERROR_MESSAGE, deletion.exception.getMessage());
- } else {
- deletionStruct.set(ERROR_CODE, (short) 0);
- }
- RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
- RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
- deletionStructs.add(deletionStruct);
- }
- responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0]));
+ response.error.write(responseStruct);
+ List<Struct> deletionStructs = new ArrayList<>();
+ for (AclDeletionResult deletion : response.deletions()) {
+ Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
+ deletion.error.write(deletionStruct);
+ RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
+ RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
+ deletionStructs.add(deletionStruct);
}
+ responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0]));
responseStructs.add(responseStruct);
}
struct.set(FILTER_RESPONSES, responseStructs.toArray());
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 6573b6e..58ce539 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -73,7 +73,8 @@ public class DescribeAclsRequest extends AbstractRequest {
short versionId = version();
switch (versionId) {
case 0:
- return new DescribeAclsResponse(throttleTimeMs, throwable, Collections.<AclBinding>emptySet());
+ return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable),
+ Collections.<AclBinding>emptySet());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index cf21aa6..993a45f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -20,51 +20,41 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DescribeAclsResponse extends AbstractResponse {
- private final static String ERROR_CODE = "error_code";
- private final static String ERROR_MESSAGE = "error_message";
private final static String RESOURCES = "resources";
private final static String ACLS = "acls";
private final int throttleTimeMs;
- private final Throwable throwable;
+ private final ApiError error;
private final Collection<AclBinding> acls;
- public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, Collection<AclBinding> acls) {
+ public DescribeAclsResponse(int throttleTimeMs, ApiError error, Collection<AclBinding> acls) {
this.throttleTimeMs = throttleTimeMs;
- this.throwable = throwable;
+ this.error = error;
this.acls = acls;
}
public DescribeAclsResponse(Struct struct) {
this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
- Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
- if (error != Errors.NONE) {
- this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
- this.acls = Collections.emptySet();
- } else {
- this.throwable = null;
- this.acls = new ArrayList<>();
- for (Object resourceStructObj : struct.getArray(RESOURCES)) {
- Struct resourceStruct = (Struct) resourceStructObj;
- Resource resource = RequestUtils.resourceFromStructFields(resourceStruct);
- for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
- Struct aclDataStruct = (Struct) aclDataStructObj;
- AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct);
- this.acls.add(new AclBinding(resource, entry));
- }
+ this.error = new ApiError(struct);
+ this.acls = new ArrayList<>();
+ for (Object resourceStructObj : struct.getArray(RESOURCES)) {
+ Struct resourceStruct = (Struct) resourceStructObj;
+ Resource resource = RequestUtils.resourceFromStructFields(resourceStruct);
+ for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
+ Struct aclDataStruct = (Struct) aclDataStructObj;
+ AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct);
+ this.acls.add(new AclBinding(resource, entry));
}
}
}
@@ -73,15 +63,8 @@ public class DescribeAclsResponse extends AbstractResponse {
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
- if (throwable != null) {
- Errors errors = Errors.forException(throwable);
- struct.set(ERROR_CODE, errors.code());
- struct.set(ERROR_MESSAGE, throwable.getMessage());
- struct.set(RESOURCES, new Struct[0]);
- return struct;
- }
- struct.set(ERROR_CODE, (short) 0);
- struct.set(ERROR_MESSAGE, null);
+ error.write(struct);
+
Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>();
for (AclBinding acl : acls) {
List<AccessControlEntry> entry = resourceToData.get(acl.resource());
@@ -91,6 +74,7 @@ public class DescribeAclsResponse extends AbstractResponse {
}
entry.add(acl.entry());
}
+
List<Struct> resourceStructs = new ArrayList<>();
for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) {
Resource resource = tuple.getKey();
@@ -113,8 +97,8 @@ public class DescribeAclsResponse extends AbstractResponse {
return throttleTimeMs;
}
- public Throwable throwable() {
- return throwable;
+ public ApiError error() {
+ return error;
}
public Collection<AclBinding> acls() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 5032660..0a4611f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -70,7 +70,7 @@ public class ResourceFilter {
/**
* Return true if this ResourceFilter has any UNKNOWN components.
*/
- public boolean unknown() {
+ public boolean isUnknown() {
return resourceType.isUnknown();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cd6ed6b..8300e0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -48,8 +48,6 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -60,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -178,7 +177,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(new Node(0, "localhost", 8121));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, TimeoutException.class);
}
@@ -192,7 +191,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
new CreateTopicsOptions().timeoutMs(10000)).all();
future.get();
}
@@ -215,21 +214,18 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(env.cluster().controller());
// Test a call where we get back ACL1 and ACL2.
- env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
- new ArrayList<AclBinding>() {{
- add(ACL1);
- add(ACL2);
- }}));
+ env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
+ asList(ACL1, ACL2)));
assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2);
// Test a call where we get back no results.
- env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
+ env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
Collections.<AclBinding>emptySet()));
assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
// Test a call where we get back an error.
env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
- new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet()));
+ new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
}
}
@@ -243,30 +239,19 @@ public class KafkaAdminClientTest {
// Test a call where we successfully create two ACLs.
env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
- new ArrayList<AclCreationResponse>() {{
- add(new AclCreationResponse(null));
- add(new AclCreationResponse(null));
- }}));
- CreateAclsResult results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
- add(ACL1);
- add(ACL2);
- }});
+ asList(new AclCreationResponse(ApiError.NONE), new AclCreationResponse(ApiError.NONE))));
+ CreateAclsResult results = env.adminClient().createAcls(asList(ACL1, ACL2));
assertCollectionIs(results.values().keySet(), ACL1, ACL2);
- for (KafkaFuture<Void> future : results.values().values()) {
+ for (KafkaFuture<Void> future : results.values().values())
future.get();
- }
results.all().get();
// Test a call where we fail to create one ACL.
- env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
- new ArrayList<AclCreationResponse>() {{
- add(new AclCreationResponse(new SecurityDisabledException("Security is disabled")));
- add(new AclCreationResponse(null));
- }}));
- results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
- add(ACL1);
- add(ACL2);
- }});
+ env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList(
+ new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED, "Security is disabled")),
+ new AclCreationResponse(ApiError.NONE))
+ ));
+ results = env.adminClient().createAcls(asList(ACL1, ACL2));
assertCollectionIs(results.values().keySet(), ACL1, ACL2);
assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
results.values().get(ACL2).get();
@@ -282,19 +267,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(env.cluster().controller());
// Test a call where one filter has an error.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
- add(new AclFilterResponse(null,
- new ArrayList<AclDeletionResult>() {{
- add(new AclDeletionResult(null, ACL1));
- add(new AclDeletionResult(null, ACL2));
- }}));
- add(new AclFilterResponse(new SecurityDisabledException("No security"),
- Collections.<AclDeletionResult>emptySet()));
- }}));
- DeleteAclsResult results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{
- add(FILTER1);
- add(FILTER2);
- }});
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+ new AclFilterResponse(asList(new AclDeletionResult(ACL1), new AclDeletionResult(ACL2))),
+ new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
+ Collections.<AclDeletionResult>emptySet()))));
+ DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
FilterResults filter1Results = filterResults.get(FILTER1).get();
assertEquals(null, filter1Results.values().get(0).exception());
@@ -305,38 +282,19 @@ public class KafkaAdminClientTest {
assertFutureError(results.all(), SecurityDisabledException.class);
// Test a call where one deletion result has an error.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
- add(new AclFilterResponse(null,
- new ArrayList<AclDeletionResult>() {{
- add(new AclDeletionResult(null, ACL1));
- add(new AclDeletionResult(new SecurityDisabledException("No security"), ACL2));
- }}));
- add(new AclFilterResponse(null, Collections.<AclDeletionResult>emptySet()));
- }}));
- results = env.adminClient().deleteAcls(
- new ArrayList<AclBindingFilter>() {{
- add(FILTER1);
- add(FILTER2);
- }});
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+ new AclFilterResponse(asList(new AclDeletionResult(ACL1),
+ new AclDeletionResult(new ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))),
+ new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
+ results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
assertTrue(results.values().get(FILTER2).get().values().isEmpty());
assertFutureError(results.all(), SecurityDisabledException.class);
// Test a call where there are no errors.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
- add(new AclFilterResponse(null,
- new ArrayList<AclDeletionResult>() {{
- add(new AclDeletionResult(null, ACL1));
- }}));
- add(new AclFilterResponse(null,
- new ArrayList<AclDeletionResult>() {{
- add(new AclDeletionResult(null, ACL2));
- }}));
- }}));
- results = env.adminClient().deleteAcls(
- new ArrayList<AclBindingFilter>() {{
- add(FILTER1);
- add(FILTER2);
- }});
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+ new AclFilterResponse(asList(new AclDeletionResult(ACL1))),
+ new AclFilterResponse(asList(new AclDeletionResult(ACL2))))));
+ results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
Collection<AclBinding> deleted = results.all().get();
assertCollectionIs(deleted, ACL1, ACL2);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index e0a0598..0ebcdfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -92,13 +92,13 @@ public class AclBindingTest {
@Test
public void testUnknowns() throws Exception {
- assertFalse(ACL1.unknown());
- assertFalse(ACL2.unknown());
- assertFalse(ACL3.unknown());
- assertFalse(ANY_ANONYMOUS.unknown());
- assertFalse(ANY_DENY.unknown());
- assertFalse(ANY_MYTOPIC.unknown());
- assertTrue(UNKNOWN_ACL.unknown());
+ assertFalse(ACL1.isUnknown());
+ assertFalse(ACL2.isUnknown());
+ assertFalse(ACL3.isUnknown());
+ assertFalse(ANY_ANONYMOUS.isUnknown());
+ assertFalse(ANY_DENY.isUnknown());
+ assertFalse(ANY_MYTOPIC.isUnknown());
+ assertTrue(UNKNOWN_ACL.isUnknown());
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 327f228..467afb3 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -226,7 +225,7 @@ public class RequestResponseTest {
checkResponse(createTxnOffsetCommitResponse(), 0);
checkRequest(createListAclsRequest());
checkErrorResponse(createListAclsRequest(), new SecurityDisabledException("Security is not enabled."));
- checkResponse(createListAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion());
+ checkResponse(createDescribeAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion());
checkRequest(createCreateAclsRequest());
checkErrorResponse(createCreateAclsRequest(), new SecurityDisabledException("Security is not enabled."));
checkResponse(createCreateAclsResponse(), ApiKeys.CREATE_ACLS.latestVersion());
@@ -1001,8 +1000,8 @@ public class RequestResponseTest {
new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build();
}
- private DescribeAclsResponse createListAclsResponse() {
- return new DescribeAclsResponse(0, null, Collections.singleton(new AclBinding(
+ private DescribeAclsResponse createDescribeAclsResponse() {
+ return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding(
new Resource(ResourceType.TOPIC, "mytopic"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
}
@@ -1019,8 +1018,8 @@ public class RequestResponseTest {
}
private CreateAclsResponse createCreateAclsResponse() {
- return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(null),
- new AclCreationResponse(new InvalidRequestException("Foo bar"))));
+ return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(ApiError.NONE),
+ new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo bar"))));
}
private DeleteAclsRequest createDeleteAclsRequest() {
@@ -1036,16 +1035,14 @@ public class RequestResponseTest {
private DeleteAclsResponse createDeleteAclsResponse() {
List<AclFilterResponse> responses = new ArrayList<>();
- responses.add(new AclFilterResponse(null,
- new HashSet<AclDeletionResult>() {{
- add(new AclDeletionResult(null, new AclBinding(
+ responses.add(new AclFilterResponse(Utils.mkSet(
+ new AclDeletionResult(new AclBinding(
new Resource(ResourceType.TOPIC, "mytopic3"),
- new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))));
- add(new AclDeletionResult(null, new AclBinding(
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
+ new AclDeletionResult(new AclBinding(
new Resource(ResourceType.TOPIC, "mytopic4"),
- new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))));
- }}));
- responses.add(new AclFilterResponse(new SecurityDisabledException("No security"),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))))));
+ responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
Collections.<AclDeletionResult>emptySet()));
return new DeleteAclsResponse(0, responses);
}
@@ -1071,9 +1068,9 @@ public class RequestResponseTest {
new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true)
);
configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
- new ApiError(Errors.NONE, null), configEntries));
+ ApiError.NONE, configEntries));
configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config(
- new ApiError(Errors.NONE, null), Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
+ ApiError.NONE, Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
return new DescribeConfigsResponse(200, configs);
}
@@ -1091,7 +1088,7 @@ public class RequestResponseTest {
private AlterConfigsResponse createAlterConfigsResponse() {
Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>();
- errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new ApiError(Errors.NONE, null));
+ errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), ApiError.NONE);
errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
return new AlterConfigsResponse(20, errors);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index bbfc42c..573a16b 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -19,27 +19,32 @@ package kafka.security
import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.common.resource.{Resource => AdminResource}
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
object SecurityUtils {
- def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = {
- for {
+ def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
+ (for {
resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
resource = Resource(resourceType, filter.resourceFilter.name)
acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
- } yield (resource, acl)
+ } yield (resource, acl)) match {
+ case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
+ case Success(s) => Right(s)
+ }
}
def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
- val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
+ val adminResource = new AdminResource(resource.resourceType.toJava, resource.name)
val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
acl.operation.toJava, acl.permissionType.toJava)
new AclBinding(adminResource, entry)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 33c6b77..84972f3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -119,7 +119,7 @@ class AdminManager(val config: KafkaConfig,
else
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
}
- CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, null))
+ CreateTopicMetadata(topic, assignments, ApiError.NONE)
} catch {
// Log client errors at a lower level than unexpected exceptions
case e@ (_: PolicyViolationException | _: ApiException) =>
@@ -135,7 +135,7 @@ class AdminManager(val config: KafkaConfig,
if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
val results = metadata.map { createTopicMetadata =>
// ignore topics that already have errors
- if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
+ if (createTopicMetadata.error.isSuccess() && !validateOnly) {
(createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
} else {
(createTopicMetadata.topic, createTopicMetadata.error)
@@ -212,7 +212,7 @@ class AdminManager(val config: KafkaConfig,
new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly)
}
- new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), configEntries.asJava)
+ new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
}
try {
@@ -280,7 +280,7 @@ class AdminManager(val config: KafkaConfig,
else
AdminUtils.changeTopicConfig(zkUtils, topic, properties)
}
- resource -> new ApiError(Errors.NONE, null)
+ resource -> ApiError.NONE
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index abf6bc0..83cdd67 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -48,7 +48,7 @@ class DelayedCreateTopics(delayMs: Long,
override def tryComplete() : Boolean = {
trace(s"Trying to complete operation for $createMetadata")
- val leaderlessPartitionCount = createMetadata.filter(_.error.is(Errors.NONE))
+ val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
.foldLeft(0) { case (topicCounter, metadata) =>
topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
}
@@ -69,7 +69,7 @@ class DelayedCreateTopics(delayMs: Long,
trace(s"Completing operation for $createMetadata")
val results = createMetadata.map { metadata =>
// ignore topics that already have errors
- if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
+ if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
(metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
else
(metadata.topic, metadata.error)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 337c740..a4fd30c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{Resource => AdminResource}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import scala.collection._
@@ -1763,22 +1763,19 @@ class KafkaApis(val requestChannel: RequestChannel,
case None =>
sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeAclsResponse(requestThrottleMs,
- new SecurityDisabledException("No Authorizer is configured on the broker."),
- Collections.emptySet()))
+ new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), Collections.emptySet()))
case Some(auth) =>
val filter = describeAclsRequest.filter()
- val returnedAcls = new util.ArrayList[AclBinding]
- val aclMap = auth.getAcls()
- aclMap.foreach { case (resource, acls) =>
- acls.foreach { acl =>
- val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+ val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
+ acls.flatMap { acl =>
+ val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name),
new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
- if (filter.matches(fixture))
- returnedAcls.add(fixture)
+ if (filter.matches(fixture)) Some(fixture)
+ else None
}
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new DescribeAclsResponse(requestThrottleMs, null, returnedAcls))
+ new DescribeAclsResponse(requestThrottleMs, ApiError.NONE, returnedAcls.asJava))
}
}
@@ -1793,8 +1790,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case Some(auth) =>
val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation =>
SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match {
- case Failure(throwable) => new AclCreationResponse(throwable)
- case Success((resource, acl)) => try {
+ case Left(apiError) => new AclCreationResponse(apiError)
+ case Right((resource, acl)) => try {
if (resource.resourceType.equals(Cluster) &&
!resource.name.equals(Resource.ClusterResourceName))
throw new InvalidRequestException("The only valid name for the CLUSTER resource is " +
@@ -1805,11 +1802,11 @@ class KafkaApis(val requestChannel: RequestChannel,
logger.debug(s"Added acl $acl to $resource")
- new AclCreationResponse(null)
+ new AclCreationResponse(ApiError.NONE)
} catch {
case throwable: Throwable =>
logger.debug(s"Failed to add acl $acl to $resource", throwable)
- new AclCreationResponse(throwable)
+ new AclCreationResponse(ApiError.fromThrowable(throwable))
}
}
}
@@ -1835,8 +1832,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// Delete based on a list of ACL fixtures.
for ((filter, i) <- filters.zipWithIndex) {
SecurityUtils.convertToResourceAndAcl(filter) match {
- case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava))
- case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
+ case Left(apiError) => filterResponseMap.put(i, new AclFilterResponse(apiError, Seq.empty.asJava))
+ case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
}
}
} else {
@@ -1845,7 +1842,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val filtersWithIndex = filters.zipWithIndex
for ((resource, acls) <- aclMap; acl <- acls) {
val binding = new AclBinding(
- new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+ new AdminResource(resource.resourceType.toJava, resource.name),
new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
acl.permissionType.toJava))
@@ -1863,8 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else None
} catch {
case throwable: Throwable =>
- Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"),
- aclBinding))
+ Some(new AclDeletionResult(ApiError.fromThrowable(throwable), aclBinding))
}
}.asJava
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bd04c7b..09ff9be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,7 +14,7 @@ package kafka.api
import java.nio.ByteBuffer
import java.util
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
@@ -22,14 +22,24 @@ import kafka.common.TopicAndPartition
import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
+import kafka.admin.AdminUtils
+import kafka.log.LogConfig
+import kafka.network.SocketServer
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Node, TopicPartition, requests}
import org.junit.Assert._
@@ -38,13 +48,6 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
-import org.apache.kafka.common.KafkaException
-import kafka.admin.AdminUtils
-import kafka.log.LogConfig
-import kafka.network.SocketServer
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
class AuthorizerIntegrationTest extends BaseRequestTest {
@@ -72,6 +75,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+ val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
+ val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
@@ -125,7 +130,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse],
ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse],
ApiKeys.END_TXN -> classOf[EndTxnResponse],
- ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse]
+ ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse],
+ ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
+ ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
+ ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse]
)
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -156,7 +164,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
- ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp))
+ ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
+ ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
+ ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
+ ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error)
)
val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -185,7 +196,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl),
ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl),
ApiKeys.END_TXN -> transactionIdWriteAcl,
- ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl)
+ ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl),
+ ApiKeys.CREATE_ACLS -> clusterAlterAcl,
+ ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
+ ApiKeys.DELETE_ACLS -> clusterAlterAcl
)
@Before
@@ -284,46 +298,47 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
build()
}
- private def createHeartbeatRequest = {
- new HeartbeatRequest.Builder(group, 1, "").build()
- }
+ private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build()
- private def createLeaveGroupRequest = {
- new LeaveGroupRequest.Builder(group, "").build()
- }
+ private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
- private def createLeaderAndIsrRequest = {
+ private def leaderAndIsrRequest = {
new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
Set(new Node(brokerId, "localhost", 0)).asJava).build()
}
- private def createStopReplicaRequest = {
- new requests.StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
- }
+ private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
- private def createControlledShutdownRequest = {
- new requests.ControlledShutdownRequest.Builder(brokerId).build()
- }
+ private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId).build()
- private def createTopicsRequest = {
+ private def createTopicsRequest =
new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
- }
- private def deleteTopicsRequest = {
- new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
- }
+ private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
- private def createDescribeConfigsRequest =
+ private def describeConfigsRequest =
new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build()
- private def createAlterConfigsRequest =
+ private def alterConfigsRequest =
new AlterConfigsRequest.Builder(
Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
new AlterConfigsRequest.Config(Collections.singleton(
new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
))), true).build()
+ private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
+
+ private def createAclsRequest = new CreateAclsRequest.Builder(
+ Collections.singletonList(new AclCreation(new AclBinding(
+ new AdminResource(AdminResourceType.TOPIC, "mytopic"),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
+
+ private def deleteAclsRequest = new DeleteAclsRequest.Builder(
+ Collections.singletonList(new AclBindingFilter(
+ new ResourceFilter(AdminResourceType.TOPIC, null),
+ new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
+
@Test
def testAuthorizationWithTopicExisting() {
@@ -338,16 +353,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
- ApiKeys.HEARTBEAT -> createHeartbeatRequest,
- ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest,
- ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest,
- ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
- ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
+ ApiKeys.HEARTBEAT -> heartbeatRequest,
+ ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
+ ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
+ ApiKeys.STOP_REPLICA -> stopReplicaRequest,
+ ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest,
ApiKeys.CREATE_TOPICS -> createTopicsRequest,
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
- ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest,
- ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest
+ ApiKeys.DESCRIBE_CONFIGS -> describeConfigsRequest,
+ ApiKeys.ALTER_CONFIGS -> alterConfigsRequest,
+ ApiKeys.CREATE_ACLS -> createAclsRequest,
+ ApiKeys.DELETE_ACLS -> deleteAclsRequest,
+ ApiKeys.DESCRIBE_ACLS -> describeAclsRequest
)
for ((key, request) <- requestKeyToRequest) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index b4e09b3..03afc9e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -31,13 +31,13 @@ import scala.util.{Failure, Success, Try}
class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
- this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override def configureSecurityBeforeServersStart() {
- val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName())
+ val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
try {
authorizer.configure(this.configs.head.originals())
authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow,
@@ -92,6 +92,8 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+ val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
@Test
override def testAclOperations(): Unit = {
@@ -116,28 +118,33 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
TestUtils.waitUntilTrue(() => {
val results = client.describeAcls(filter).values.get()
acls == results.asScala.toSet
- }, "timed out waiting for ACLs")
+ }, s"timed out waiting for ACLs $acls")
}
@Test
def testAclOperations2(): Unit = {
client = AdminClient.create(createConfig())
- val results = client.createAcls(List(acl2, acl2).asJava)
- assertEquals(Set(acl2, acl2), results.values.keySet().asScala)
+ val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
+ assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala)
results.all.get()
waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
+ waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY)
val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY)
+ val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), AccessControlEntryFilter.ANY)
waitForDescribeAcls(client, filterA, Set())
+ waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
- val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
- assertEquals(Set(filterA, filterB), results2.values.keySet().asScala)
+ val results2 = client.deleteAcls(List(filterA, filterB, filterC).asJava, new DeleteAclsOptions())
+ assertEquals(Set(filterA, filterB, filterC), results2.values.keySet.asScala)
assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
+ assertEquals(Set(transactionalIdAcl), results2.values.get(filterC).get.values.asScala.map(_.binding).toSet)
assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
waitForDescribeAcls(client, filterB, Set())
+ waitForDescribeAcls(client, filterC, Set())
}
@Test
@@ -161,7 +168,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {
TestUtils.waitUntilTrue(() => {
- val result = client.createAcls(List(fooAcl).asJava, new CreateAclsOptions)
+ val result = client.createAcls(List(fooAcl, transactionalIdAcl).asJava, new CreateAclsOptions)
if (expectAuth) {
Try(result.all.get) match {
case Failure(e) =>
@@ -180,9 +187,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
}, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail"))
if (expectAuth) {
waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl))
+ waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
}
TestUtils.waitUntilTrue(() => {
- val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new DeleteAclsOptions)
+ val result = client.deleteAcls(List(fooAcl.toFilter, transactionalIdAcl.toFilter).asJava, new DeleteAclsOptions)
if (expectAuth) {
Try(result.all.get) match {
case Failure(e) =>
@@ -196,13 +204,17 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
verifyCauseIsClusterAuth(e)
true
case Success(_) =>
+ assertEquals(Set(fooAcl, transactionalIdAcl), result.values.keySet)
assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet)
+ assertEquals(Set(transactionalIdAcl),
+ result.values.get(transactionalIdAcl.toFilter).get.values.asScala.map(_.binding).toSet)
true
}
}
}, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail"))
if (expectAuth) {
- waitForDescribeAcls(client, fooAcl.toFilter, Set())
+ waitForDescribeAcls(client, fooAcl.toFilter, Set.empty)
+ waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set.empty)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 0ef3405..d89a9df 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -36,7 +36,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
val response = sendCreateTopicRequest(request)
- val error = response.errors.values.asScala.find(!_.is(Errors.NONE))
+ val error = response.errors.values.asScala.find(_.isFailure)
assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
request.topics.asScala.foreach { case (topic, details) =>
@@ -118,7 +118,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
assertEquals(expected.messageWithFallback, actual.messageWithFallback)
}
// If no error validate topic exists
- if (expectedError.is(Errors.NONE) && !request.validateOnly) {
+ if (expectedError.isSuccess && !request.validateOnly) {
validateTopicExists(topic)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 2d47876..ebb3344 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -193,7 +193,7 @@ public class StreamsKafkaClient {
for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name());
- if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
+ if (error.isFailure() && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
}
}