You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:06:02 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516363883



##########
File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##########
@@ -70,12 +74,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-object DynamicBrokerReconfigurationTest {
-  val SecureInternal = "INTERNAL"
-  val SecureExternal = "EXTERNAL"
-}
-
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+@RunWith(value = classOf[Parameterized])
+class DynamicBrokerReconfigurationTest(quorumBasedController: JBoolean) extends ZooKeeperTestHarness with SaslSetup {

Review comment:
       It is quite expensive to parameterize these test cases. I am not sure it is worthwhile. If forwarding works for one of these cases, why would the others be different? Since we are not planning to enable this feature yet, I think unit tests in `KafkaApisTest` and maybe one integration test are good enough.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val enableMetadataQuorumProp = "enable.metadata.quorum"

Review comment:
       nit: every other property name uses a capital first letter

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the the purpose of inter-broker forwarding.
+ * Any serialization/deserialization failure should raise a {@link SerializationException} to be consistent.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);

Review comment:
       Can you add a javadoc for these methods and mention `@throws SerializationException`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+    // Leave the principal null here is ok since we will fail the request during Kafka API handling.
+    val originalPrincipal = if (principalSerde.isDefined)
+      principalSerde.get.deserialize(envelopeRequest.principalData)
+    else
+      null
+
+    val originalClientAddress = InetAddress.getByAddress(envelopeRequest.clientAddress)
+    val originalContext = new RequestContext(originalHeader, connectionId,
+      originalClientAddress, originalPrincipal, listenerName,
+      securityProtocol, context.clientInformation, isPrivilegedListener)
+
+    val envelopeContext = new EnvelopeContext(
+      brokerContext = context,
+      receive.payload)
+
+    new network.RequestChannel.Request(processor = id, context = originalContext,

Review comment:
       nit: `network` prefix is not needed since we are already in this package

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/PrincipalDeserializationFailureException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a kafka principal deserialization failure during request forwarding.
+ */
+public class PrincipalDeserializationFailureException extends AuthorizationException {

Review comment:
       nit: I feel `FailureException` is redundant. Can we just call it `PrincipalDeserializationException`?
   
   Also, I am not sure about this extending `AuthorizationException`. I would consider it more of an invalid request than an authorization failure, though the effect is the same. I think it's probably better to avoid categorizing it and just let it extend `ApiException`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {

Review comment:
       nit: we are doing more than building the response here, we are sending it. How about `sendFailedEnvelopeResponse`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean = {

Review comment:
       nit: `validatedForwardedRequest`

##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -180,7 +180,6 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
 
     /**
      * Create a new ClientRequest.
-     *

Review comment:
       nit: seems this change was not needed

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean = {
+    if (!config.forwardingEnabled || !request.context.fromPrivilegedListener) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // forwarding is not enabled yet, we would not handle the request.
+      closeConnection(request, Collections.emptyMap())
+      false
+    } else if (!authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
+      buildFailedEnvelopeResponse(request, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      false
+    } else if (!request.header.apiKey.forwardable) {
+      buildFailedEnvelopeResponse(request, Errors.INVALID_REQUEST)
+      false
+    } else if (request.principalSerde.isEmpty) {
+      buildFailedEnvelopeResponse(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
+      false
+    } else if (!controller.isActive) {
+      buildFailedEnvelopeResponse(request, Errors.NOT_CONTROLLER)
+      false
+    } else
+      true
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
+      forwardingManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def isForwardingEnabled(request: RequestChannel.Request): Boolean =
+    config.forwardingEnabled && request.principalSerde.isDefined
+
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
   override def handle(request: RequestChannel.Request): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
-      request.header.apiKey match {
-        case ApiKeys.PRODUCE => handleProduceRequest(request)
-        case ApiKeys.FETCH => handleFetchRequest(request)
-        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
-        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
-        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
-        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
-        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
-        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
-        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
-        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
-        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
-        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
-        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
-        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
-        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
-        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
-        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
-        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
-        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
-        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
-        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
-        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
-        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
-        case ApiKeys.END_TXN => handleEndTxnRequest(request)
-        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
-        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
-        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
-        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
-        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
-        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
-        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
-        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
-        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
-        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
-        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
-        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
-        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
-        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
-        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
-        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
-        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
-        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
-        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
-        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
-        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
-        case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
-        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
-        case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
-        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
-        // Until we are ready to integrate the Raft layer, these APIs are treated as
-        // unexpected and we just close the connection.
-        case ApiKeys.VOTE => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.BEGIN_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.END_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.DESCRIBE_QUORUM => closeConnection(request, util.Collections.emptyMap())
+
+      val isValidRequest = !request.isForwarded || validateForwardRequest(request)

Review comment:
       I think it would be simpler to short-cut return.
   
   ```scala
   if (request.isForwarded && !validateForwardRequest(request))
     return
   ```

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)

Review comment:
       nit: instead of `original`, could we use `forwarded` in these names?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {

Review comment:
       nit: define return type

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       > I was under the impression that byte buffer provides more information such as a read position and capacity/limits, which makes the deserialization easier.
   
   Hmm, not sure I get your point. Nothing is simpler than a byte array. The main question is whether we want to expose the actual request buffer to the plugin, especially since we still plan on using it afterwards. The plugin is treated as a trusted component in any case, so it might not make a big difference. Probably we should optimize here for simplicity.
   
   > If given a byte[], I'm afraid they need to convert to byte buffer internally eventually.
   
   That may or may not be true. If it is, users can just use `ByteBuffer.wrap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org