You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/11 05:46:02 UTC

[GitHub] [kafka] hachikuji opened a new pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

hachikuji opened a new pull request #9732:
URL: https://github.com/apache/kafka/pull/9732


   This patch contains the following improvements:
   
   - Separate inbound/outbound request flows so that we can open the door for concurrent inbound request handling
   - Rewrite `KafkaNetworkChannel` to use `InterBrokerSendThread` which fixes a number of bugs/shortcomings
   - Get rid of a lot of boilerplate conversions in `KafkaNetworkChannel` 
   - Improve validation of inbound responses in `KafkaRaftClient` by checking correlationId. This fixes a bug which could cause an out of order Fetch to be applied incorrectly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541274490



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -164,13 +164,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
                                       listenerName: ListenerName,
                                       time: Time,
                                       threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
+  extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
-
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+  def generateRequests(): Iterable[RequestAndCompletionHandler] = {

Review comment:
       I was just noticing that we only ever return a Queue with a single item. Optional seems to fit that case more cleanly. However (as we discussed offline), I wonder why we have the code like this in the first place. Maybe @abbccdda has some insight?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541158744



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       Yeah, not sure it's worth it, though I don't feel strongly. I think ultimately we're going to start relying more on the generated classes to avoid unnecessary conversions. We're now entering "phase 2" of the request overhaul which means we can start figuring out how to remove the AbstractRequest/AbstractResponse layer. I think it will take more smarts in the generated classes to make a dent here, but if we are agreed on the goal (?), then I do not think preserving the encapsulation here is worthwhile.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541216977



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -135,7 +117,16 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
       brokerToControllerListenerName, time, threadName)
   }
 
-  override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
+  /**
+   * Send request to the controller.
+   *
+   * @param request         The request to be sent.
+   * @param callback        Request completion callback.
+   * @param retryDeadlineMs The retry deadline which will only be checked after receiving a response.
+   *                        This means that in the worst case, the total timeout would be twice of
+   *                        the configured timeout.
+   */
+  def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
                            callback: ControllerRequestCompletionHandler,

Review comment:
       nit: indent misaligned 

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.raft.RaftMessage;
+import org.apache.kafka.raft.RaftMessageQueue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BlockingMessageQueue implements RaftMessageQueue {
+    private final BlockingQueue<RaftEvent> queue = new LinkedBlockingQueue<>();
+    private final AtomicInteger size = new AtomicInteger(0);
+
+    @Override
+    public RaftMessage poll(long timeoutMs) {
+        try {
+            RaftEvent event = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+            if (event instanceof MessageReceived) {
+                size.decrementAndGet();
+                return ((MessageReceived) event).message;
+            } else {
+                return null;
+            }
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+
+    }
+
+    @Override
+    public void offer(RaftMessage message) {
+        queue.add(new MessageReceived(message));
+        size.incrementAndGet();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size.get() == 0;
+    }
+
+    @Override
+    public void wakeup() {
+        queue.add(Wakeup.INSTANCE);
+    }
+
+    public interface RaftEvent {

Review comment:
       Could you use a sentinel RaftMessage object here instead? Might simplify this class a bit. Not a big deal either way

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftMessageQueue.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+/**
+ * This class is used to serialize inbound requests or responses to outbound requests.
+ * It basically just allows us to wrap a blocking queue so that we can have a mocked
+ * implementation which does not depend on system time.
+ *
+ * See {@link org.apache.kafka.raft.internals.BlockingMessageQueue}.
+ */
+public interface RaftMessageQueue {
+
+    /**
+     * Block for the arrival of a new message.
+     *
+     * @param timeoutMs timeout in milliseconds to wait for a new event
+     * @return the event or null if the timeout was reached

Review comment:
       nit: should add something like "timeout was reached or #wakeup() was called"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541199422



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       Support for optional fields would go a long way I think. I am not sure it will be possible to remove all intermediate representations, but perhaps they can be the exception and not the rule. Some version checks in `KafkaApis` are probably inevitable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541221553



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,34 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    sendRequests(Seq(request))
+  }
 
-    generateRequests().foreach { request =>
+  def sendRequests(requests: Iterable[RequestAndCompletionHandler]): Unit = {
+    inboundQueue.addAll(requests.asJavaCollection)

Review comment:
       Yeah, I was going to change this to use `add` anyway to make the expectation clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9732:
URL: https://github.com/apache/kafka/pull/9732


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r545986703



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -34,25 +35,58 @@ import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
 /**
  * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation,
  * so partitions will learn about updates through LeaderAndIsr messages sent from the controller
  */
 trait AlterIsrManager {
-  def start(): Unit
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
 
   def enqueue(alterIsrItem: AlterIsrItem): Boolean
 
   def clearPending(topicPartition: TopicPartition): Unit
 }
 
-case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+object AlterIsrManager {
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    scheduler: KafkaScheduler,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    brokerEpochSupplier: () => Long
+  ): AlterIsrManager = {
+    val channelManager = new BrokerToControllerChannelManager(

Review comment:
       I guess we're not sharing these channel managers between anything. In that case, moving them into the classes that need them seems fine.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -152,76 +151,89 @@ abstract class ControllerRequestCompletionHandler extends RequestCompletionHandl
   def onTimeout(): Unit
 }
 
-case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       callback: ControllerRequestCompletionHandler,
-                                       deadlineMs: Long)
-
-class BrokerToControllerRequestThread(networkClient: KafkaClient,
-                                      metadataUpdater: ManualMetadataUpdater,
-                                      requestQueue: LinkedBlockingDeque[BrokerToControllerQueueItem],
-                                      metadataCache: kafka.server.MetadataCache,
-                                      config: KafkaConfig,
-                                      listenerName: ListenerName,
-                                      time: Time,
-                                      threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
-
+case class BrokerToControllerQueueItem(
+  createdTimeMs: Long,
+  request: AbstractRequest.Builder[_ <: AbstractRequest],
+  callback: ControllerRequestCompletionHandler
+)
+
+class BrokerToControllerRequestThread(
+  networkClient: KafkaClient,
+  metadataUpdater: ManualMetadataUpdater,
+  metadataCache: kafka.server.MetadataCache,
+  config: KafkaConfig,
+  listenerName: ListenerName,
+  time: Time,
+  threadName: String,
+  retryTimeoutMs: Long
+) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
+
+  private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
+  def enqueue(request: BrokerToControllerQueueItem): Unit = {
+    requestQueue.add(request)
+    if (activeController.isDefined) {
+      wakeup()
+    }
+  }
 
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
-    val requestsToSend = new mutable.Queue[RequestAndCompletionHandler]
-    val topRequest = requestQueue.poll()
-    if (topRequest != null) {
-      val request = RequestAndCompletionHandler(
-        activeController.get,
-        topRequest.request,
-        handleResponse(topRequest)
-      )
+  def queueSize: Int = {
+    requestQueue.size
+  }
 
-      requestsToSend.enqueue(request)
+  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+    val currentTimeMs = time.milliseconds()
+    val requestIter = requestQueue.iterator()

Review comment:
       Any reason to use Iterator here instead of queue methods (i.e., peek and remove). Is it to ensure a consistent view of the queue while we're going through it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541147023



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       It seems like there are many classes where direct access to `data` is not actually needed. How do you feel about having a public method in `RequestUtils` that exposes `data` for the raft layer instead? Something like:
   
   ```java
   public static ApiMessage requestData(AbstractRequest req)
   public static ApiMessage responseData(AbstractResponse resp)
   ```
   
   Or is it not worth it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541291359



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.raft.RaftMessage;
+import org.apache.kafka.raft.RaftMessageQueue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BlockingMessageQueue implements RaftMessageQueue {
+    private final BlockingQueue<RaftEvent> queue = new LinkedBlockingQueue<>();
+    private final AtomicInteger size = new AtomicInteger(0);
+
+    @Override
+    public RaftMessage poll(long timeoutMs) {
+        try {
+            RaftEvent event = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+            if (event instanceof MessageReceived) {
+                size.decrementAndGet();
+                return ((MessageReceived) event).message;
+            } else {
+                return null;
+            }
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+
+    }
+
+    @Override
+    public void offer(RaftMessage message) {
+        queue.add(new MessageReceived(message));
+        size.incrementAndGet();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size.get() == 0;
+    }
+
+    @Override
+    public void wakeup() {
+        queue.add(Wakeup.INSTANCE);
+    }
+
+    public interface RaftEvent {

Review comment:
       That's a good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541368311



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -164,13 +164,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
                                       listenerName: ListenerName,
                                       time: Time,
                                       threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
+  extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
-
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+  def generateRequests(): Iterable[RequestAndCompletionHandler] = {

Review comment:
       Yeah, I agree that looks a bit odd. It looks like it was intended to be a loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541225579



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -164,13 +164,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
                                       listenerName: ListenerName,
                                       time: Time,
                                       threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
+  extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
-
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+  def generateRequests(): Iterable[RequestAndCompletionHandler] = {

Review comment:
       Can you clarify what the option would achieve over an empty collection?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541221984



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)

Review comment:
       Haha, yeah, exactly what I was thinking.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543561935



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -53,6 +54,41 @@ object KafkaNetworkChannel {
 
 }
 
+private[raft] class RaftSendThread(

Review comment:
       Not really as far as I know. This is my favored style of late because it results in consistent alignment of parameters. I get really annoyed when I see stuff like this:
   ```
   def someMethodWithAnArguablyOverVerboseName(foo: String,
                                               bar: Option[List[Int]]): (Option[String], List[String)
   
   def concicseMethod(foo: String,
                      bar: String): Unit = {
   ```
   No matter how you look at it, it seems hideous.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543557442



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -221,7 +216,7 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
       } else {
         // need to backoff to avoid tight loops
         debug("No controller defined in metadata cache, retrying after backoff")
-        backoff()

Review comment:
       Also, if we don't need `backoff` anymore, we can remove it since this was the only usage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543529599



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -32,17 +33,18 @@ import scala.jdk.CollectionConverters._
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
-abstract class InterBrokerSendThread(name: String,
-                                     networkClient: KafkaClient,
-                                     time: Time,
-                                     isInterruptible: Boolean = true)
-  extends ShutdownableThread(name, isInterruptible) {
-
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
-  def requestTimeoutMs: Int
+class InterBrokerSendThread(
+  name: String,
+  networkClient: KafkaClient,
+  requestTimeoutMs: Int,
+  time: Time,
+  isInterruptible: Boolean = true
+) extends ShutdownableThread(name, isInterruptible) {
+
+  private val inboundQueue = new ConcurrentLinkedQueue[RequestAndCompletionHandler]()

Review comment:
       Yea, sounds good 👍 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541172565



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       I don't quite understand how we would handle versioning well if we only have data classes. Do you have thoughts on that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541274830



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,34 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    sendRequests(Seq(request))
+  }
 
-    generateRequests().foreach { request =>
+  def sendRequests(requests: Iterable[RequestAndCompletionHandler]): Unit = {
+    inboundQueue.addAll(requests.asJavaCollection)

Review comment:
       Ah, didn't see your earlier comment 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#issuecomment-744697922


   @abbccdda To clarify, what we are asking is why the implementation of `generateRequests` in `BrokerToControllerChannelManager` builds a queue for a single item.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541147023



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       It seems like there are many classes where direct access to `data` is not actually needed. How do you feel about having a public method in `RequestUtils` that exposes `data` for the raft layer instead? Something like:
   
   ```java
   public static ApiMessage requestData(AbstractRequest req)
   public static ApiMessage responseData(AbstractRequest req)
   ```
   
   Or is it not worth it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543588101



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -221,7 +216,7 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
       } else {
         // need to backoff to avoid tight loops
         debug("No controller defined in metadata cache, retrying after backoff")
-        backoff()

Review comment:
       While addressing this issue, I realized the current timeout logic does not handle the case when the controller is not known. This will cause the requests to keep piling up until we find the controller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541219622



##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -68,179 +51,93 @@ object KafkaNetworkChannel {
     }
   }
 
-  private[raft] def responseData(response: AbstractResponse): ApiMessage = {
-    response match {
-      case voteResponse: VoteResponse => voteResponse.data
-      case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data
-      case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data
-      case fetchResponse: FetchResponse[_] => fetchResponse.data
-      case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response")
-    }
-  }
-
-  private[raft] def requestData(request: AbstractRequest): ApiMessage = {
-    request match {
-      case voteRequest: VoteRequest => voteRequest.data
-      case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data
-      case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data
-      case fetchRequest: FetchRequest => fetchRequest.data
-      case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request")
-    }
-  }
-
 }
 
-class KafkaNetworkChannel(time: Time,
-                          client: KafkaClient,
-                          clientId: String,
-                          retryBackoffMs: Int,
-                          requestTimeoutMs: Int) extends NetworkChannel with Logging {
+class KafkaNetworkChannel(
+  time: Time,
+  client: KafkaClient,
+  requestTimeoutMs: Int
+) extends NetworkChannel with Logging {
   import KafkaNetworkChannel._
 
   type ResponseHandler = AbstractResponse => Unit
 
   private val correlationIdCounter = new AtomicInteger(0)
-  private val pendingInbound = mutable.Map.empty[Long, ResponseHandler]
-  private val undelivered = new ArrayBlockingQueue[RaftMessage](10)
-  private val pendingOutbound = new ArrayBlockingQueue[RaftRequest.Outbound](10)
   private val endpoints = mutable.HashMap.empty[Int, Node]
 
-  override def newCorrelationId(): Int = correlationIdCounter.getAndIncrement()
-
-  private def buildClientRequest(req: RaftRequest.Outbound): ClientRequest = {
-    val destination = req.destinationId.toString
-    val request = buildRequest(req.data)
-    val correlationId = req.correlationId
-    val createdTimeMs = req.createdTimeMs
-    new ClientRequest(destination, request, correlationId, clientId, createdTimeMs, true,
-      requestTimeoutMs, null)
-  }
-
-  override def send(message: RaftMessage): Unit = {
-    message match {
-      case request: RaftRequest.Outbound =>
-        if (!pendingOutbound.offer(request))
-          throw new KafkaException("Pending outbound queue is full")
-
-      case response: RaftResponse.Outbound =>
-        pendingInbound.remove(response.correlationId).foreach { onResponseReceived: ResponseHandler =>
-          onResponseReceived(buildResponse(response.data))
-        }
-      case _ =>
-        throw new IllegalArgumentException("Unhandled message type " + message)
+  private val requestThread = new InterBrokerSendThread(
+    name = "raft-outbound-request-thread",
+    networkClient = client,
+    requestTimeoutMs = requestTimeoutMs,
+    time = time,
+    isInterruptible = false
+  )
+
+  override def send(request: RaftRequest.Outbound): Unit = {
+    def completeFuture(message: ApiMessage): Unit = {
+      val response = new RaftResponse.Inbound(
+        request.correlationId,
+        message,
+        request.destinationId
+      )
+      request.completion.complete(response)
     }
-  }
 
-  private def sendOutboundRequests(currentTimeMs: Long): Unit = {
-    while (!pendingOutbound.isEmpty) {
-      val request = pendingOutbound.peek()
-      endpoints.get(request.destinationId) match {
-        case Some(node) =>
-          if (client.connectionFailed(node)) {
-            pendingOutbound.poll()
-            val apiKey = ApiKeys.forId(request.data.apiKey)
-            val disconnectResponse = RaftUtil.errorResponse(apiKey, Errors.BROKER_NOT_AVAILABLE)
-            val success = undelivered.offer(new RaftResponse.Inbound(
-              request.correlationId, disconnectResponse, request.destinationId))
-            if (!success) {
-              throw new KafkaException("Undelivered queue is full")
-            }
-
-            // Make sure to reset the connection state
-            client.ready(node, currentTimeMs)
-          } else if (client.ready(node, currentTimeMs)) {
-            pendingOutbound.poll()
-            val clientRequest = buildClientRequest(request)
-            client.send(clientRequest, currentTimeMs)
-          } else {
-            // We will retry this request on the next poll
-            return
-          }
-
-        case None =>
-          pendingOutbound.poll()
-          val apiKey = ApiKeys.forId(request.data.apiKey)
-          val responseData = RaftUtil.errorResponse(apiKey, Errors.BROKER_NOT_AVAILABLE)
-          val response = new RaftResponse.Inbound(request.correlationId, responseData, request.destinationId)
-          if (!undelivered.offer(response))
-            throw new KafkaException("Undelivered queue is full")
+    def onComplete(clientResponse: ClientResponse): Unit = {
+      val response = if (clientResponse.authenticationException != null) {
+        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      } else if (clientResponse.wasDisconnected()) {
+        errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)
+      } else {
+        clientResponse.responseBody.data
       }
+      completeFuture(response)
     }
-  }
-
-  def getConnectionInfo(nodeId: Int): Node = {
-    if (!endpoints.contains(nodeId))
-      null
-    else
-      endpoints(nodeId)
-  }
-
-  def allConnections(): Set[Node] = {
-    endpoints.values.toSet
-  }
 
-  private def buildInboundRaftResponse(response: ClientResponse): RaftResponse.Inbound = {
-    val header = response.requestHeader()
-    val data = if (response.authenticationException != null) {
-      RaftUtil.errorResponse(header.apiKey, Errors.CLUSTER_AUTHORIZATION_FAILED)
-    } else if (response.wasDisconnected) {
-      RaftUtil.errorResponse(header.apiKey, Errors.BROKER_NOT_AVAILABLE)
-    } else {
-      responseData(response.responseBody)
-    }
-    new RaftResponse.Inbound(header.correlationId, data, response.destination.toInt)
-  }
+    endpoints.get(request.destinationId) match {
+      case Some(node) =>
+        requestThread.sendRequest(RequestAndCompletionHandler(
+          destination = node,
+          request = buildRequest(request.data),
+          handler = onComplete
+        ))
 
-  private def pollInboundResponses(timeoutMs: Long, inboundMessages: util.List[RaftMessage]): Unit = {
-    val responses = client.poll(timeoutMs, time.milliseconds())
-    for (response <- responses.asScala) {
-      inboundMessages.add(buildInboundRaftResponse(response))
+      case None =>
+        completeFuture(errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE))
     }
   }
 
-  private def drainInboundRequests(inboundMessages: util.List[RaftMessage]): Unit = {
-    undelivered.drainTo(inboundMessages)
+  def pollOnce(): Unit = {

Review comment:
       Yeah, let me make that clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541225075



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -32,17 +33,18 @@ import scala.jdk.CollectionConverters._
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
-abstract class InterBrokerSendThread(name: String,
-                                     networkClient: KafkaClient,
-                                     time: Time,
-                                     isInterruptible: Boolean = true)
-  extends ShutdownableThread(name, isInterruptible) {
-
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
-  def requestTimeoutMs: Int
+class InterBrokerSendThread(
+  name: String,
+  networkClient: KafkaClient,
+  requestTimeoutMs: Int,
+  time: Time,
+  isInterruptible: Boolean = true
+) extends ShutdownableThread(name, isInterruptible) {
+
+  private val inboundQueue = new ConcurrentLinkedQueue[RequestAndCompletionHandler]()

Review comment:
       The `BrokerToControllerChannelManager` is probably the only case where this might be a concern since requests can be forwarded form the client. We wouldn't want to block the request handler if the queue gets too large, but we might want to start failing inbound requests. I'd suggest we create a separate JIRA to think about this. Sound fair?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541219472



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)
+    wakeup()
+  }
 
-    generateRequests().foreach { request =>
+  private def drainInboundQueue(): Unit = {
+    while (!inboundQueue.isEmpty) {
+      val request = inboundQueue.poll()
       val completionHandler = request.handler
       unsentRequests.put(request.destination,
         networkClient.newClientRequest(
           request.destination.idString,
           request.request,
-          now,
+          time.milliseconds(),
           true,
           requestTimeoutMs,
           completionHandler))
     }
+  }
 
+  override def doWork(): Unit = {
     try {
+      var now = time.milliseconds()

Review comment:
       That's fair. It's probably fine to compute it just once for `sendRequests`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543567541



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
##########
@@ -127,11 +127,14 @@ class TxnMarkerQueue(@volatile var destination: Node) {
   def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size)
 }
 
-class TransactionMarkerChannelManager(config: KafkaConfig,
-                                      metadataCache: MetadataCache,
-                                      networkClient: NetworkClient,
-                                      txnStateManager: TransactionStateManager,
-                                      time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup {
+class TransactionMarkerChannelManager(

Review comment:
       That's fair. I actually held myself back. I tried to only touch the cases that I was modifying anyway, but let me know if there are others. This one was especially obnoxious because of the long parameter list to `InterBrokerSendThread`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r546905110



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -152,76 +151,89 @@ abstract class ControllerRequestCompletionHandler extends RequestCompletionHandl
   def onTimeout(): Unit
 }
 
-case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       callback: ControllerRequestCompletionHandler,
-                                       deadlineMs: Long)
-
-class BrokerToControllerRequestThread(networkClient: KafkaClient,
-                                      metadataUpdater: ManualMetadataUpdater,
-                                      requestQueue: LinkedBlockingDeque[BrokerToControllerQueueItem],
-                                      metadataCache: kafka.server.MetadataCache,
-                                      config: KafkaConfig,
-                                      listenerName: ListenerName,
-                                      time: Time,
-                                      threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
-
+case class BrokerToControllerQueueItem(
+  createdTimeMs: Long,
+  request: AbstractRequest.Builder[_ <: AbstractRequest],
+  callback: ControllerRequestCompletionHandler
+)
+
+class BrokerToControllerRequestThread(
+  networkClient: KafkaClient,
+  metadataUpdater: ManualMetadataUpdater,
+  metadataCache: kafka.server.MetadataCache,
+  config: KafkaConfig,
+  listenerName: ListenerName,
+  time: Time,
+  threadName: String,
+  retryTimeoutMs: Long
+) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
+
+  private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
+  def enqueue(request: BrokerToControllerQueueItem): Unit = {
+    requestQueue.add(request)
+    if (activeController.isDefined) {
+      wakeup()
+    }
+  }
 
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
-    val requestsToSend = new mutable.Queue[RequestAndCompletionHandler]
-    val topRequest = requestQueue.poll()
-    if (topRequest != null) {
-      val request = RequestAndCompletionHandler(
-        activeController.get,
-        topRequest.request,
-        handleResponse(topRequest)
-      )
+  def queueSize: Int = {
+    requestQueue.size
+  }
 
-      requestsToSend.enqueue(request)
+  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+    val currentTimeMs = time.milliseconds()
+    val requestIter = requestQueue.iterator()

Review comment:
       Yeah, I just thought it was a little simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541156636



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)
+    wakeup()
+  }
 
-    generateRequests().foreach { request =>
+  private def drainInboundQueue(): Unit = {
+    while (!inboundQueue.isEmpty) {
+      val request = inboundQueue.poll()

Review comment:
       Similar to above comment, either check for null or use `remove()`

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -32,17 +33,18 @@ import scala.jdk.CollectionConverters._
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
-abstract class InterBrokerSendThread(name: String,
-                                     networkClient: KafkaClient,
-                                     time: Time,
-                                     isInterruptible: Boolean = true)
-  extends ShutdownableThread(name, isInterruptible) {
-
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
-  def requestTimeoutMs: Int
+class InterBrokerSendThread(
+  name: String,
+  networkClient: KafkaClient,
+  requestTimeoutMs: Int,
+  time: Time,
+  isInterruptible: Boolean = true
+) extends ShutdownableThread(name, isInterruptible) {
+
+  private val inboundQueue = new ConcurrentLinkedQueue[RequestAndCompletionHandler]()

Review comment:
       Just wondering, is there any reason why we might want a bounded queue here? I suspect not since we never expect too many requests to be enqueued at once.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -164,13 +164,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
                                       listenerName: ListenerName,
                                       time: Time,
                                       threadName: String)
-  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false) {
+  extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private var activeController: Option[Node] = None
 
-  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
-
-  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+  def generateRequests(): Iterable[RequestAndCompletionHandler] = {

Review comment:
       Not needed here, but I wonder if we should just make this return an `Option[RequestAndCompletionHandler]`

##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -68,179 +51,93 @@ object KafkaNetworkChannel {
     }
   }
 
-  private[raft] def responseData(response: AbstractResponse): ApiMessage = {
-    response match {
-      case voteResponse: VoteResponse => voteResponse.data
-      case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data
-      case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data
-      case fetchResponse: FetchResponse[_] => fetchResponse.data
-      case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response")
-    }
-  }
-
-  private[raft] def requestData(request: AbstractRequest): ApiMessage = {
-    request match {
-      case voteRequest: VoteRequest => voteRequest.data
-      case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data
-      case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data
-      case fetchRequest: FetchRequest => fetchRequest.data
-      case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request")
-    }
-  }
-
 }
 
-class KafkaNetworkChannel(time: Time,
-                          client: KafkaClient,
-                          clientId: String,
-                          retryBackoffMs: Int,
-                          requestTimeoutMs: Int) extends NetworkChannel with Logging {
+class KafkaNetworkChannel(
+  time: Time,
+  client: KafkaClient,
+  requestTimeoutMs: Int
+) extends NetworkChannel with Logging {
   import KafkaNetworkChannel._
 
   type ResponseHandler = AbstractResponse => Unit
 
   private val correlationIdCounter = new AtomicInteger(0)
-  private val pendingInbound = mutable.Map.empty[Long, ResponseHandler]
-  private val undelivered = new ArrayBlockingQueue[RaftMessage](10)
-  private val pendingOutbound = new ArrayBlockingQueue[RaftRequest.Outbound](10)
   private val endpoints = mutable.HashMap.empty[Int, Node]
 
-  override def newCorrelationId(): Int = correlationIdCounter.getAndIncrement()
-
-  private def buildClientRequest(req: RaftRequest.Outbound): ClientRequest = {
-    val destination = req.destinationId.toString
-    val request = buildRequest(req.data)
-    val correlationId = req.correlationId
-    val createdTimeMs = req.createdTimeMs
-    new ClientRequest(destination, request, correlationId, clientId, createdTimeMs, true,
-      requestTimeoutMs, null)
-  }
-
-  override def send(message: RaftMessage): Unit = {
-    message match {
-      case request: RaftRequest.Outbound =>
-        if (!pendingOutbound.offer(request))
-          throw new KafkaException("Pending outbound queue is full")
-
-      case response: RaftResponse.Outbound =>
-        pendingInbound.remove(response.correlationId).foreach { onResponseReceived: ResponseHandler =>
-          onResponseReceived(buildResponse(response.data))
-        }
-      case _ =>
-        throw new IllegalArgumentException("Unhandled message type " + message)
+  private val requestThread = new InterBrokerSendThread(
+    name = "raft-outbound-request-thread",
+    networkClient = client,
+    requestTimeoutMs = requestTimeoutMs,
+    time = time,
+    isInterruptible = false
+  )
+
+  override def send(request: RaftRequest.Outbound): Unit = {
+    def completeFuture(message: ApiMessage): Unit = {
+      val response = new RaftResponse.Inbound(
+        request.correlationId,
+        message,
+        request.destinationId
+      )
+      request.completion.complete(response)
     }
-  }
 
-  private def sendOutboundRequests(currentTimeMs: Long): Unit = {
-    while (!pendingOutbound.isEmpty) {
-      val request = pendingOutbound.peek()
-      endpoints.get(request.destinationId) match {
-        case Some(node) =>
-          if (client.connectionFailed(node)) {
-            pendingOutbound.poll()
-            val apiKey = ApiKeys.forId(request.data.apiKey)
-            val disconnectResponse = RaftUtil.errorResponse(apiKey, Errors.BROKER_NOT_AVAILABLE)
-            val success = undelivered.offer(new RaftResponse.Inbound(
-              request.correlationId, disconnectResponse, request.destinationId))
-            if (!success) {
-              throw new KafkaException("Undelivered queue is full")
-            }
-
-            // Make sure to reset the connection state
-            client.ready(node, currentTimeMs)
-          } else if (client.ready(node, currentTimeMs)) {
-            pendingOutbound.poll()
-            val clientRequest = buildClientRequest(request)
-            client.send(clientRequest, currentTimeMs)
-          } else {
-            // We will retry this request on the next poll
-            return
-          }
-
-        case None =>
-          pendingOutbound.poll()
-          val apiKey = ApiKeys.forId(request.data.apiKey)
-          val responseData = RaftUtil.errorResponse(apiKey, Errors.BROKER_NOT_AVAILABLE)
-          val response = new RaftResponse.Inbound(request.correlationId, responseData, request.destinationId)
-          if (!undelivered.offer(response))
-            throw new KafkaException("Undelivered queue is full")
+    def onComplete(clientResponse: ClientResponse): Unit = {
+      val response = if (clientResponse.authenticationException != null) {
+        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      } else if (clientResponse.wasDisconnected()) {
+        errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)
+      } else {
+        clientResponse.responseBody.data
       }
+      completeFuture(response)
     }
-  }
-
-  def getConnectionInfo(nodeId: Int): Node = {
-    if (!endpoints.contains(nodeId))
-      null
-    else
-      endpoints(nodeId)
-  }
-
-  def allConnections(): Set[Node] = {
-    endpoints.values.toSet
-  }
 
-  private def buildInboundRaftResponse(response: ClientResponse): RaftResponse.Inbound = {
-    val header = response.requestHeader()
-    val data = if (response.authenticationException != null) {
-      RaftUtil.errorResponse(header.apiKey, Errors.CLUSTER_AUTHORIZATION_FAILED)
-    } else if (response.wasDisconnected) {
-      RaftUtil.errorResponse(header.apiKey, Errors.BROKER_NOT_AVAILABLE)
-    } else {
-      responseData(response.responseBody)
-    }
-    new RaftResponse.Inbound(header.correlationId, data, response.destination.toInt)
-  }
+    endpoints.get(request.destinationId) match {
+      case Some(node) =>
+        requestThread.sendRequest(RequestAndCompletionHandler(
+          destination = node,
+          request = buildRequest(request.data),
+          handler = onComplete
+        ))
 
-  private def pollInboundResponses(timeoutMs: Long, inboundMessages: util.List[RaftMessage]): Unit = {
-    val responses = client.poll(timeoutMs, time.milliseconds())
-    for (response <- responses.asScala) {
-      inboundMessages.add(buildInboundRaftResponse(response))
+      case None =>
+        completeFuture(errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE))
     }
   }
 
-  private def drainInboundRequests(inboundMessages: util.List[RaftMessage]): Unit = {
-    undelivered.drainTo(inboundMessages)
+  def pollOnce(): Unit = {

Review comment:
       Is this just for testing?

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,34 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    sendRequests(Seq(request))
+  }
 
-    generateRequests().foreach { request =>
+  def sendRequests(requests: Iterable[RequestAndCompletionHandler]): Unit = {
+    inboundQueue.addAll(requests.asJavaCollection)

Review comment:
       Just a quick note: you had `offer` here before which has different behavior. It probably won't matter though, since the queue is unbounded.
   
   (I only noticed this because I had commented on the `offer` before your latest commit 😄 )

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)
+    wakeup()
+  }
 
-    generateRequests().foreach { request =>
+  private def drainInboundQueue(): Unit = {
+    while (!inboundQueue.isEmpty) {
+      val request = inboundQueue.poll()
       val completionHandler = request.handler
       unsentRequests.put(request.destination,
         networkClient.newClientRequest(
           request.destination.idString,
           request.request,
-          now,
+          time.milliseconds(),
           true,
           requestTimeoutMs,
           completionHandler))
     }
+  }
 
+  override def doWork(): Unit = {
     try {
+      var now = time.milliseconds()

Review comment:
       Previously, all the requests we gathered from `generateRequests` would have the same timestamp which was also passed to the subsequent call to NetworkClient#ready and send (via `sendRequests`). 
   
   What's the reason for recomputing the timestamp for each request we create? 
   
   Should we get a newer timestamp for the call to NetworkClient? 
   
   Seems a little weird to create requests at t1, t2, etc and then call NetworkClient.send with t0. I wonder if this would have weird throttling side effects. 

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)

Review comment:
       We should probably either check the result of `offer` or use `BlockingQueue#add` instead. Since we're using an unbounded queue and never expect this to fail, I would lean towards `add`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541378653



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       I can see this:
   
   1. AbstractRequest/AbstractResponse methods become part of the ApiMessage hierarchy.
   2. FooRequest/FooResponse extends FooDataRequest/FooDataResponse (like Colin suggested before)
   
   But I don't think you want to eliminate FooRequest/FooResponse in the example above. You don't need to perform conversions for the inner classes, but it's a place where you can normalize the representation. We do that for many of the existing request/response classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543533537



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -149,29 +152,32 @@ case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: Abs
 
 class BrokerToControllerRequestThread(networkClient: KafkaClient,
                                       metadataUpdater: ManualMetadataUpdater,
-                                      requestQueue: LinkedBlockingDeque[BrokerToControllerQueueItem],
                                       metadataCache: kafka.server.MetadataCache,
                                       config: KafkaConfig,
                                       listenerName: ListenerName,
                                       time: Time,
                                       threadName: String)
   extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
+  private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private var activeController: Option[Node] = None
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler] = {
-    val requestsToSend = new mutable.Queue[RequestAndCompletionHandler]
-    val topRequest = requestQueue.poll()
-    if (topRequest != null) {
-      val request = RequestAndCompletionHandler(
+  def enqueue(request: BrokerToControllerQueueItem): Unit = {
+    requestQueue.add(request)
+    if (activeController.isDefined) {
+      wakeup()
+    }
+  }
+
+  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+    Option(requestQueue.poll()).map { queueItem =>

Review comment:
       👍 

##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -53,6 +54,41 @@ object KafkaNetworkChannel {
 
 }
 
+private[raft] class RaftSendThread(

Review comment:
       style nit/question: I think we have a mixture of argument indentation for Scala classes/methods. Do we have an established style convention for this? Normally I follow the opening paren when breaking out arguments into their own line (though I'm not sure that's correct)

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.raft.RaftMessage;
+import org.apache.kafka.raft.RaftMessageQueue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BlockingMessageQueue implements RaftMessageQueue {
+    private static final RaftMessage WAKEUP_MESSAGE = new RaftMessage() {

Review comment:
       👍 

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftMessageQueue.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+/**
+ * This class is used to serialize inbound requests or responses to outbound requests.
+ * It basically just allows us to wrap a blocking queue so that we can have a mocked
+ * implementation which does not depend on system time.
+ *
+ * See {@link org.apache.kafka.raft.internals.BlockingMessageQueue}.
+ */
+public interface RaftMessageQueue {
+
+    /**
+     * Block for the arrival of a new message.
+     *
+     * @param timeoutMs timeout in milliseconds to wait for a new event
+     * @return the event or null if either the timeout was reached or there was
+     *     a call to {@link #wakeup()} before any events became available
+     */
+    RaftMessage poll(long timeoutMs);
+
+    /**
+     * Offer a new message to the queue.
+     *
+     * @param message the message to deliver
+     * @throws IllegalStateException if the queue cannot accept the message
+     */
+    void offer(RaftMessage message);

Review comment:
       nit: Maybe name this `add` so it aligns with the java.util.Queue method? 

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -32,17 +32,19 @@ import scala.jdk.CollectionConverters._
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
-abstract class InterBrokerSendThread(name: String,
-                                     networkClient: KafkaClient,
-                                     time: Time,
-                                     isInterruptible: Boolean = true)
-  extends ShutdownableThread(name, isInterruptible) {
+abstract class InterBrokerSendThread(
+  name: String,
+  networkClient: KafkaClient,
+  requestTimeoutMs: Int,
+  time: Time,
+  isInterruptible: Boolean = true
+) extends ShutdownableThread(name, isInterruptible) {
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
-  def requestTimeoutMs: Int
   private val unsentRequests = new UnsentRequests
 
-  def hasUnsentRequests = unsentRequests.iterator().hasNext
+  def generateRequests(): Iterable[RequestAndCompletionHandler]
+
+  def hasUnsentRequests: Boolean = unsentRequests.iterator().hasNext

Review comment:
       Is it worth adding a `size` or `isEmpty` to `UnsentRequests`? 

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
##########
@@ -127,11 +127,14 @@ class TxnMarkerQueue(@volatile var destination: Node) {
   def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size)
 }
 
-class TransactionMarkerChannelManager(config: KafkaConfig,
-                                      metadataCache: MetadataCache,
-                                      networkClient: NetworkClient,
-                                      txnStateManager: TransactionStateManager,
-                                      time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup {
+class TransactionMarkerChannelManager(

Review comment:
       nit: (related to style question elsewhere) if we want to change the style of these class definitions, can we do it as a separate PR? I always find it difficult when style changes are conflated with logical changes

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -221,7 +216,7 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
       } else {
         // need to backoff to avoid tight loops
         debug("No controller defined in metadata cache, retrying after backoff")
-        backoff()

Review comment:
       Hmm, this seems strange, though maybe I'm missing something.
   
   If we get here, `activeController` is not defined. If we then pollOnce it looks like the request produced by `generateRequests` will get an exception since the activeController Option is empty. 
   
   Previously, the backoff method was just pausing the thread for some time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541370421



##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -51,22 +53,30 @@ abstract class InterBrokerSendThread(name: String,
     awaitShutdown()
   }
 
-  override def doWork(): Unit = {
-    var now = time.milliseconds()
+  def sendRequest(request: RequestAndCompletionHandler): Unit = {
+    inboundQueue.offer(request)
+    wakeup()
+  }
 
-    generateRequests().foreach { request =>
+  private def drainInboundQueue(): Unit = {
+    while (!inboundQueue.isEmpty) {
+      val request = inboundQueue.poll()
       val completionHandler = request.handler
       unsentRequests.put(request.destination,
         networkClient.newClientRequest(
           request.destination.idString,
           request.request,
-          now,
+          time.milliseconds(),
           true,
           requestTimeoutMs,
           completionHandler))
     }
+  }
 
+  override def doWork(): Unit = {
     try {
+      var now = time.milliseconds()

Review comment:
       I ended up moving creation time to RequestAndCompletionHandler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r541199422



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##########
@@ -53,7 +53,7 @@ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
     }
 
     @Override
-    protected AddOffsetsToTxnRequestData data() {
+    public AddOffsetsToTxnRequestData data() {

Review comment:
       Support for optional fields would go a long way I think. I am not sure it will be possible to remove all intermediate representations, but perhaps it can be the exception and not the rule. Some version checks in `KafkaApis` are probably inevitable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#issuecomment-744661249


   Just to reply https://github.com/apache/kafka/pull/9732#discussion_r541192299 here, `generateRequests()` is part of the InterBrokerSender thread which I didn't write up. In the meantime, I think changing it to one element at a time makes sense for AlterIsr and forwarding cases for now, if we believe that would make the interface easier to be used, let's refactor it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9732: KAFKA-10842; Use `InterBrokerSendThread` for raft's outbound network channel

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9732:
URL: https://github.com/apache/kafka/pull/9732#discussion_r543568828



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -221,7 +216,7 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
       } else {
         // need to backoff to avoid tight loops
         debug("No controller defined in metadata cache, retrying after backoff")
-        backoff()

Review comment:
       Good catch. Will fix. Let me check on test cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org