You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/16 19:50:59 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

abbccdda opened a new pull request #9600:
URL: https://github.com/apache/kafka/pull/9600


   To make sure the forwarded request could be properly handled by the controller, when forwarding is enabled, we should acquire the controller API versions to enforce as joint constraints back to the client.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r555419244



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -53,4 +53,18 @@ public String toString() {
             ", maxVersion= " + maxVersion +
             ")";
     }
+
+    public static ApiVersion versionsInCommon(ApiKeys apiKey, ApiVersion supportedVersions,
+                                              short minAllowedVersion, short maxAllowedVersion) {
+        if (supportedVersions == null)
+            throw new UnsupportedVersionException("The broker does not support " + apiKey);
+
+        short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
+        short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
+        if (minVersion > maxVersion)
+            throw new UnsupportedVersionException("The broker does not support " + apiKey +

Review comment:
       Not totally sure this gets thrown in the right place. If there are no overlapping versions, perhaps we should leave the api out of the result. Perhaps that suggests we should use this definition:
   ```java
   // Return common api versions or empty if there are none
   public Optional<ApiVersion> intersect(ApiVersion other);
   ```
   Then we can let the caller decide how to handle this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557825070



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
   ): Unit = {
-    def responseCallback(response: AbstractResponse): Unit = {
-      sendForwardedResponse(request, response)
+    def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = {
+      responseEither match {
+        case Left(response) => sendForwardedResponse(request, response)
+        case Right(error) => closeConnection(request, Collections.singletonMap(error, 1))

Review comment:
       Sg, I prefer using `info` here since it should be a rare case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r556245337



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -53,4 +53,18 @@ public String toString() {
             ", maxVersion= " + maxVersion +
             ")";
     }
+
+    public static ApiVersion versionsInCommon(ApiKeys apiKey, ApiVersion supportedVersions,

Review comment:
       I looked up the usage of this API, it has the external effect to have reference `min` and `max` versions. At least the current format is the most straightforward refactoring without the need to create another `ApiVersion` instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9600:
URL: https://github.com/apache/kafka/pull/9600


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r558469889



##########
File path: clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
##########
@@ -125,13 +126,6 @@ public void testUsableVersionCalculationNoKnownVersions() {
             () -> versions.latestUsableVersion(ApiKeys.FETCH));
     }
 
-    @Test

Review comment:
       Hmm.. I think my suggestion about `latestUsableVersion(ApiKeys)` was off if we had to remove this. I think I had failed to take into account that `NodeApiVersions` represented the versions supported by the remote node, so the intersection was in fact necessary. Sorry about that.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -190,35 +209,41 @@ class BrokerToControllerRequestThread(
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
-      } else if (activeController.isDefined) {
-        requestIter.remove()
-        return Some(RequestAndCompletionHandler(
-          time.milliseconds(),
-          activeController.get,
-          request.request,
-          handleResponse(request)
-        ))
+      } else {
+        val controllerAddress = activeControllerAddress()
+        if (controllerAddress.isDefined) {
+          requestIter.remove()
+          return Some(RequestAndCompletionHandler(
+            time.milliseconds(),
+            controllerAddress.get,
+            request.request,
+            handleResponse(request)
+          ))
+        }
       }
     }
     None
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
     if (response.wasDisconnected()) {
-      activeController = None
+      updateControllerAddress(null)
       requestQueue.putFirst(request)
     } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
       // just close the controller connection and wait for metadata cache update in doWork
-      networkClient.disconnect(activeController.get.idString)
-      activeController = None
+      activeControllerAddress().foreach(controllerAddress => {

Review comment:
       nit: a little more idiomatic
   ```scala
   foreach { controllerAddress =>
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -139,8 +138,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
   ): Unit = {
-    def responseCallback(response: AbstractResponse): Unit = {
-      sendForwardedResponse(request, response)
+    def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = {
+      responseEither match {
+        case Left(response) => sendForwardedResponse(request, response)
+        case Right(error) =>
+          if (error == Errors.UNSUPPORTED_VERSION)

Review comment:
       It's a little odd to have the `if` check here since we will close the connection regardless of the error. That's one reason I thought `Option[AbstractResponse]` might be clearer. The `None` could be treated as implying an unsupported version.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi
         return apiKeys;
     }
 
+    public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(final byte minMagic,

Review comment:
       nit: maybe add a short javadoc?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -227,4 +230,7 @@ public ApiVersion apiVersion(ApiKeys apiKey) {
         return supportedVersions.get(apiKey);
     }
 
+    public Map<ApiKeys, ApiVersion> fullApiVersions() {

Review comment:
       How about `allSupportedApiVersions`?

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
       Yeah, but that suggests a generality to the API that doesn't exist. There is only one error that is possible to be returned here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557783391



##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
      * Get the latest version supported by the broker within an allowed range of versions
      */
     public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
-        ApiVersion usableVersion = supportedVersions.get(apiKey);
-        if (usableVersion == null)
-            throw new UnsupportedVersionException("The broker does not support " + apiKey);
-        return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion);
+        return latestUsableVersion(apiKey, supportedVersions.get(apiKey), oldestAllowedVersion, latestAllowedVersion);
     }
 
-    private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions,
-                                      short minAllowedVersion, short maxAllowedVersion) {
-        short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
-        short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
-        if (minVersion > maxVersion)
+    private short latestUsableVersion(ApiKeys apiKey,
+                                      ApiVersion supportedVersions,
+                                      short minAllowedVersion,
+                                      short maxAllowedVersion) {
+        if (supportedVersions == null)

Review comment:
       nit: since we moved the null check here, why don't we remove the parameter as well and call `supportedVersions.get(apiKey)` here?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi
         return apiKeys;
     }
 
+    public static ApiVersionsResponseKeyCollection commonApiVersionsWithActiveController(final byte minMagic,

Review comment:
       Maybe `intersectControllerApiVersions`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
      * Get the latest version supported by the broker within an allowed range of versions
      */
     public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {

Review comment:
       We could probably simplify this so that it takes a single `ApiVersion` parameter?
   
   By the way, the implementation above `latestUsableVersion(ApiKeys apiKey)` since it basically does an intersection of the latest supported version with itself. A little helper (say `latestSupportedOrThrow`) might simplify this.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -140,6 +143,16 @@ class BrokerToControllerChannelManager(
       callback
     ))
   }
+
+  def controllerApiVersions(): Option[NodeApiVersions] =
+    requestThread.activeControllerAddress() match {
+      case Some(activeController) =>
+        if (activeController.id() == config.brokerId)
+          Some(currentNodeApiVersions)
+        else
+          Option(apiVersions.get(activeController.idString()))
+      case None => None

Review comment:
       Seems like we can replace the `match` with a `flatMap`?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -205,20 +226,20 @@ class BrokerToControllerRequestThread(
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
     if (response.wasDisconnected()) {
-      activeController = None
+      updateControllerAddress(None)
       requestQueue.putFirst(request)
     } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
       // just close the controller connection and wait for metadata cache update in doWork
-      networkClient.disconnect(activeController.get.idString)
-      activeController = None
+      networkClient.disconnect(activeControllerAddress().get.idString)

Review comment:
       This is another slippery looking case. Can we just rewrite this as a `foreach` so that we don't need to worry about it?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -190,11 +211,11 @@ class BrokerToControllerRequestThread(
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
-      } else if (activeController.isDefined) {
+      } else if (activeControllerAddress().isDefined) {
         requestIter.remove()
         return Some(RequestAndCompletionHandler(
           time.milliseconds(),
-          activeController.get,
+          activeControllerAddress().get,

Review comment:
       The usage is a tad suspicious because the atomic reference suggests that the value could change. I guess we are ok because the value will only be overwritten in the same thread that is calling `generateRequests`, but it might be worth rewriting this part anyway. For example:
   ```scala
   } else {
     val controllerAddress = activeControllerAddress()
     if (controllerAddress.isDefined) {
   ...
   ```

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -98,30 +100,40 @@ class ForwardingManagerImpl(
         val envelopeError = envelopeResponse.error()
         val requestBody = request.body[AbstractRequest]
 
-        val response = if (envelopeError != Errors.NONE) {
-          // An envelope error indicates broker misconfiguration (e.g. the principal serde
-          // might not be defined on the receiving broker). In this case, we do not return
-          // the error directly to the client since it would not be expected. Instead we
-          // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
-          // on the broker.
-          debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
-          requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+        // Unsupported version indicates an incompatibility between controller and client API versions. The

Review comment:
       It may be helpful to mention that this can happen because the controller changed after a connection was established.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
   ): Unit = {
-    def responseCallback(response: AbstractResponse): Unit = {
-      sendForwardedResponse(request, response)
+    def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = {
+      responseEither match {
+        case Left(response) => sendForwardedResponse(request, response)
+        case Right(error) => closeConnection(request, Collections.singletonMap(error, 1))

Review comment:
       A debug message would be helpful here so that we know why the connection was closed. I think we might actually prefer to use `emptyMap()` here since the unsupported version error would be misleading.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -148,42 +149,49 @@ object ApiVersion {
 
   def apiVersionsResponse(throttleTimeMs: Int,
                           maxMagic: Byte,
-                          latestSupportedFeatures: Features[SupportedVersionRange]): ApiVersionsResponse = {
+                          latestSupportedFeatures: Features[SupportedVersionRange],
+                          controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
     apiVersionsResponse(
       throttleTimeMs,
       maxMagic,
       latestSupportedFeatures,
       Features.emptyFinalizedFeatures,
-      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH
+      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+      controllerApiVersions
     )
   }
 
   def apiVersionsResponse(throttleTimeMs: Int,
                           maxMagic: Byte,
                           latestSupportedFeatures: Features[SupportedVersionRange],
                           finalizedFeatures: Features[FinalizedVersionRange],
-                          finalizedFeaturesEpoch: Long): ApiVersionsResponse = {
-    val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic)
+                          finalizedFeaturesEpoch: Long,
+                          controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
+    val apiKeys = controllerApiVersions match {
+      case None => ApiVersionsResponse.defaultApiKeys(maxMagic)
+      case Some(controllerApiVersion) => ApiVersionsResponse.commonApiVersionsWithActiveController(
+        maxMagic, controllerApiVersion.fullApiVersions())
+    }
+
     if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
       throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME)
-      return new ApiVersionsResponse(
+      new ApiVersionsResponse(
         ApiVersionsResponse.createApiVersionsResponseData(
           DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
           Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
           apiKeys,
           latestSupportedFeatures,
           finalizedFeatures,
+          finalizedFeaturesEpoch))
+    else

Review comment:
       nit: since this is a big block, could we add the braces?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -169,11 +182,19 @@ class BrokerToControllerRequestThread(
 ) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
-  private var activeController: Option[Node] = None
+  private val activeController = new AtomicReference[Option[Node]](None)

Review comment:
       nit: an atomic reference of Option is a little strange. Could we just use null and change the code to use `Option(activeController.get())`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        val apiVersionsResponse =
+          finalizedFeaturesOpt match {
+            case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
+              requestThrottleMs,
+              config.interBrokerProtocolVersion.recordVersion.value,
+              supportedFeatures,
+              finalizedFeatures.features,
+              finalizedFeatures.epoch,
+              controllerApiVersions)
+            case None => ApiVersion.apiVersionsResponse(
+              requestThrottleMs,
+              config.interBrokerProtocolVersion.recordVersion.value,
+              supportedFeatures,
+              controllerApiVersions)
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
       Since we are not using the `Error` from the result, maybe `Option[AbstractResponse]` would be a better type.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else

Review comment:
       nit: add braces

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients;
+package org.apache.kafka.common.protocol;

Review comment:
       Ok. I don't have a strong argument to keep it where it is today I guess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r553701160



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1790,17 +1790,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        if (isForwardingEnabled(request) && controllerApiVersions.isEmpty) {

Review comment:
       This seems a little severe. Not all apis need to be coordinated with the controller. We have to handle the case when we receive a version that the current controller cannot handle anyway, so I think it's ok to make this a best-effort intersection and return just the broker APIs if the controller APIs are not yet known.
   
   The tricky case for this PR is when the controller changed or we learned about new API version support after a client had already connected to the broker and sent `ApiVersions`. In this case, we have to detect version incompatibility dynamically when we try to forward the request. I might be missing something, but the current patch doesn't seem to handle this. Maybe the simplest option is to let the controller return UNSUPPORTED_VERSION in the envelope response if the header indicates an api or version that it does not support. Then the question is whether this error should be sent back to the client, but that would be a little surprising. 
   
   Consider this sequence:
   
   1. Client connects and sends ApiVersions request
   2. Current controller supports AlterConfig v0-4, so that is what the broker indicates in the ApiVersions response
   3. New controller is elected and only supports AlterConfig v0-3
   4. Client sends AlterConfig v4
   
   Now what happens? An unsupported version error here would be treated as fatal by the client. I think we agreed that instead of sending back the error, the broker would just disconnect. This would force a reconnect and a refresh of the API versions.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r553701160



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1790,17 +1790,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        if (isForwardingEnabled(request) && controllerApiVersions.isEmpty) {

Review comment:
       This seems a little severe. Not all apis need to be coordinated with the controller. We have to handle the case when we receive a version that the current controller cannot handle anyway, so I think it's ok to make this a best-effort intersection and return just the broker APIs if the controller APIs are not yet known.
   
   The tricky case for this PR is when the controller changed or we learned about new API version support after a client had already connected to the broker and sent `ApiVersions`. In this case, we have to detect version incompatibility dynamically when we try to forward the request. I might be missing something, but the current patch doesn't seem to handle this. Maybe the simplest option is to let the controller return UNSUPPORTED_VERSION in the envelope response if the header indicates an api or version that it does not support. Then the question is whether this error should be sent back to the client, but that would be a little surprising. 
   
   Consider this sequence:
   
   1. Client connects and sends ApiVersions request
   2. Current controller supports AlterConfig v0-4, so that is what the broker indicates in the ApiVersions response
   3. New controller is elected and only supports AlterConfig v0-3
   4. Client sends AlterConfig v4
   
   Now what happens? An unsupported version error here would be treated as fatal by the client. Instead, I think we agreed that instead of sending back the error, the broker would just disconnect. This would force a reconnect and a refresh of the API version support.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r558484623



##########
File path: clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
##########
@@ -125,13 +126,6 @@ public void testUsableVersionCalculationNoKnownVersions() {
             () -> versions.latestUsableVersion(ApiKeys.FETCH));
     }
 
-    @Test

Review comment:
       No worry, I will revert it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r555415827



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients;
+package org.apache.kafka.common.protocol;

Review comment:
       Why move this class?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1790,17 +1790,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        if (isForwardingEnabled(request) && controllerApiVersions.isEmpty) {
+          // If the controller api version is missing and we already enabled feature support,
+          // we need to let ApiVersion request retry by sending unsupported version.
+          apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)

Review comment:
       As mentioned previously, I think we want to relax this. It's fine to return the broker versions if the controller versions are unknown.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -171,6 +184,20 @@ class BrokerToControllerRequestThread(
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private var activeController: Option[Node] = None
 
+  private val lock = new ReentrantReadWriteLock
+
+  def activeControllerAddress(): Option[Node] = {
+    CoreUtils.inReadLock(lock) {

Review comment:
       This might be overkill. Perhaps we could make `activeController` volatile or an atomic reference?

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -109,7 +113,14 @@ class ForwardingManagerImpl(
         } else {
           parseResponse(envelopeResponse.responseData, requestBody, request.header)
         }
-        responseCallback(response)
+
+        // Unsupported version indicates an incompatibility between controller and client API versions. The
+        // forwarding broker should close the connection with the client and let it reinitialize the connection
+        // and refresh the controller API versions.
+        if (response.errorCounts().containsKey(Errors.UNSUPPORTED_VERSION)) {

Review comment:
       Taking into account #9850, I think we should be checking `envelopeError`. If the broker could not parse the embedded request, then it would not be able to include an embedded response.

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +80,8 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: AbstractResponse => Unit,

Review comment:
       I think we could generalize the response callback and avoid the additional callback argument. Perhaps we could return `Either[Response, Exception]`. Then in `KafkaApis`, we can log an error in the exceptional case and close the connection.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -53,4 +53,18 @@ public String toString() {
             ", maxVersion= " + maxVersion +
             ")";
     }
+
+    public static ApiVersion versionsInCommon(ApiKeys apiKey, ApiVersion supportedVersions,
+                                              short minAllowedVersion, short maxAllowedVersion) {
+        if (supportedVersions == null)
+            throw new UnsupportedVersionException("The broker does not support " + apiKey);
+
+        short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
+        short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
+        if (minVersion > maxVersion)
+            throw new UnsupportedVersionException("The broker does not support " + apiKey +

Review comment:
       Not totally sure this gets thrown in the right place. If there are no overlapping versions, perhaps we should leave the api out of the result. Perhaps that suggests we should this definition:
   ```java
   // Return common api versions or empty if there are none
   public Optional<ApiVersion> intersect(ApiVersion other);
   ```
   Then we can let the caller decide how to handle this case.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -53,4 +53,18 @@ public String toString() {
             ", maxVersion= " + maxVersion +
             ")";
     }
+
+    public static ApiVersion versionsInCommon(ApiKeys apiKey, ApiVersion supportedVersions,

Review comment:
       nit: we could simplify this a little bit. I'd suggest a non-static method. Maybe something like this:
   ```java
   public ApiVersion intersect(ApiVersion other) {
     // verify this.apiKey matches other.apiKey
     // return intersection as implemented below
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r558529073



##########
File path: core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
##########
@@ -25,6 +27,15 @@ import scala.jdk.CollectionConverters._
 
 abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
 
+  // Configure control plane listener to make sure we have separate listeners from client,
+  // in order to avoid returning Envelope API version.

Review comment:
       It is a little strange for the test case to have a dependence on this. It seems we should be testing the inter-broker listener as well? I think I would instead try to modify failing assertions so that they assert the right thing depending on the listener config. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r556743113



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients;
+package org.apache.kafka.common.protocol;

Review comment:
       I want to make `ApiVersion` to be usable for common/message/ApiVersionsResponse




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r556743113



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients;
+package org.apache.kafka.common.protocol;

Review comment:
       I want to make `versionsInCommon` to be usable for common/message/ApiVersionsResponse




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557810929



##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
      * Get the latest version supported by the broker within an allowed range of versions
      */
     public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
-        ApiVersion usableVersion = supportedVersions.get(apiKey);
-        if (usableVersion == null)
-            throw new UnsupportedVersionException("The broker does not support " + apiKey);
-        return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion);
+        return latestUsableVersion(apiKey, supportedVersions.get(apiKey), oldestAllowedVersion, latestAllowedVersion);
     }
 
-    private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions,
-                                      short minAllowedVersion, short maxAllowedVersion) {
-        short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
-        short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
-        if (minVersion > maxVersion)
+    private short latestUsableVersion(ApiKeys apiKey,
+                                      ApiVersion supportedVersions,
+                                      short minAllowedVersion,
+                                      short maxAllowedVersion) {
+        if (supportedVersions == null)

Review comment:
       Yea, actually we could remove the private function since it becomes single caller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557823644



##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
       I'm inclined to keep it as it is, since the current return type gives caller a root cause message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r558607703



##########
File path: core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
##########
@@ -45,10 +47,13 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
     } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse): Unit = {
-    val enabledPublicApis = ApiKeys.enabledApis()
+  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
+    val expectedApis = ApiKeys.enabledApis()
+    if (listenerName.equals(controlPlaneListenerName) ) {

Review comment:
       nit: can use `==` in scala




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r553775686



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1790,17 +1790,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        if (isForwardingEnabled(request) && controllerApiVersions.isEmpty) {

Review comment:
       I think it relates to the timing of the forwarding. When the controller switches and certain ApiVersions are updated, the forwarding response will contain unsupported version exception. At that time, we would do a disconnect to the client. We have two options here to trigger the disconnect:
   
   1.  when we detect unsupported version exception from the forwarding response
   2. when we detect the new ApiVersion set is different from the previous one
   
   I personally think #1 is more accurate, but it has a downside where we couldn't tell whether it was truly an incompatible admin client trying to make connection. #2 may have wider impact than necessary, although it could disconnect clients with unaffected RPCs unnecessarily. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org