You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/19 02:22:00 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10157: MINOR: Raft request thread should discover api versions

hachikuji opened a new pull request #10157:
URL: https://github.com/apache/kafka/pull/10157


   We do not plan to rely on the IBP in order to determine API versions for raft requests. Instead, we want to discover them through the ApiVersions API. This patch enables the flag to do so.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875328



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2244,7 +2244,6 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
 
     @Override
     public void close() {
-        log.close();

Review comment:
       This has since been done via a separate PR that was already merged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579707786



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {

Review comment:
       It's a good suggestion. I think what we want is for `ClientResponse` to expose something like `Either[AbstractResponse, Exception]`. This would prevent bugs like this in the future since it forces the code to unwrap the response. I'm willing to give it a shot here, but I suspect it will cause a much bigger diff. My intent here was to try and keep the PR small so that we could get it into 2.8. Let me know what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-782330736


   We are missing the handling of authentication and unsupported version exceptions in BrokerToControllerChannelManager. I am working on the test cases now and will submit an update shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579892241



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2244,7 +2244,6 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
 
     @Override
     public void close() {
-        log.close();

Review comment:
       The remove was done here: https://github.com/apache/kafka/pull/10168
   
   Both the `RaftManager` and `KafkaRaftClient` where closing the `ReplicatedLog`. It was decided that the `RaftManager` owns the log and should close it instead of the `KafkaRaftClient`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579977803



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -124,8 +124,18 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        error(s"Request $request failed due to unsupported version error",
+          clientResponse.authenticationException)

Review comment:
       It should pass `clientResponse.versionMismatch` rather than `clientResponse.authenticationException`  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781825379


   > Okay. I guess we need another KIP that explains how to upgrade a cluster and to determine when it is safe to enable the RaftClient on all of the brokers of a cluster.
   
   Yes, a KIP describing the upgrade process will be necessary. I think api version negotiation is probably the least of our concerns 😆 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-782725727


   I opened a separate PR for the `close()` bug fix: https://github.com/apache/kafka/pull/10168


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579749435



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2244,7 +2244,6 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
 
     @Override
     public void close() {
-        log.close();

Review comment:
       Why are we removing this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579707786



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {

Review comment:
       It's a good suggestion. I think what we want is for `ClientResponse` to expose something like `Either[AbstractResponse, Errors]`. This would prevent bugs like this in the future since it forces the code to unwrap the response. I'm willing to give it a shot here, but I suspect it will cause a much bigger diff. My intent here was to try and keep the PR small so that we could get it into 2.8. Let me know what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10157:
URL: https://github.com/apache/kafka/pull/10157


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875266



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+      } else if (clientResponse.authenticationException != null) {
+        // For now we treat authentication errors as retriable. We use the
+        // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+        // Note that `BrokerToControllerChannelManager` will still log the
+        // authentication errors so that users have a chance to fix the problem.
+        errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
       Makes sense. Let's file a JIRA to improve it as you suggested here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579959527



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+      } else if (clientResponse.authenticationException != null) {
+        // For now we treat authentication errors as retriable. We use the
+        // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+        // Note that `BrokerToControllerChannelManager` will still log the
+        // authentication errors so that users have a chance to fix the problem.
+        errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
       Filed this: https://issues.apache.org/jira/browse/KAFKA-12355.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781784110


   > We do not plan to rely on the IBP in order to determine API versions for raft requests. Instead, we want to discover them through the ApiVersions API. This patch enables the flag to do so.
   
   How or where do we check that the connected nodes support the versions we require? Is that coming in a future PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579954768



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {

Review comment:
       Filed this: https://issues.apache.org/jira/browse/KAFKA-12353




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579707394



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+      } else if (clientResponse.authenticationException != null) {
+        // For now we treat authentication errors as retriable. We use the
+        // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+        // Note that `BrokerToControllerChannelManager` will still log the
+        // authentication errors so that users have a chance to fix the problem.
+        errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
       I agree it is debatable. If we make it a fatal error, I think we would do so under the expectation that the broker receiving the authentication failure is the one misconfigured. However, that may or may not be true. If a misconfigured broker tries to join the cluster, it could end up crashing all of the other brokers through authentication failures. Perhaps what we want is for authentication failures to be fatal during some initial window when the broker is starting up and non-fatal afterwards. In any case, because we do not have a good mechanism in place here to propagate fatal errors, it seemed simpler to treat it as retriable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#issuecomment-781799540


   @jsancio I may be misunderstanding your question, but api versions are negotiated internally in `NetworkClient`. If there are no compatible versions, then the client will raise an unsupported version error.
   
   It looks like it would be useful to have at least one test case to verify the expected behavior. At a glance, I could not find any logic to check the returned `ClientResponse` for version errors.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579694847



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+      } else if (clientResponse.authenticationException != null) {
+        // For now we treat authentication errors as retriable. We use the
+        // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+        // Note that `BrokerToControllerChannelManager` will still log the
+        // authentication errors so that users have a chance to fix the problem.
+        errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
       Why do we treat authentication problems as retriable?

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {

Review comment:
       Seems like it's pretty common t check for these two fields. Could we have an enum to represent the two errors and then have a function from the enum the Error? Seems a bit easier to use and less error prone.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875306



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {

Review comment:
       I'm fine to improve this post 2.8, let's file a JIRA please.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org