You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/17 20:34:49 UTC

[GitHub] [kafka] mumrah opened a new pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

mumrah opened a new pull request #10340:
URL: https://github.com/apache/kafka/pull/10340


   Also use different log prefixes for the different channels. Log prefixes now look like:
   
   ```
   [BrokerToControllerChannelManager broker=1 name=alterIsr] ...
   [BrokerToControllerChannelManager broker=1 name=forwarding] ...
   [BrokerToControllerChannelManager broker=1 name=heartbeat] ...
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801967783


   Is this a 2.8 blocker?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r596399619



##########
File path: core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
##########
@@ -319,12 +325,44 @@ class BrokerToControllerRequestThreadTest {
 
     val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
+    testRequestThread.start()
 
     testRequestThread.enqueue(queueItem)
     pollUntil(testRequestThread, () => callbackResponse.get != null)
     assertNotNull(callbackResponse.get.authenticationException)
   }
 
+  @Test
+  def testThreadNotStarted(): Unit = {
+    // Make sure we throw if we enqueue anything while the thread is not running
+    val time = new MockTime()
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val controllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val activeController = new Node(controllerId, "host", 1234)
+
+    when(controllerNodeProvider.get()).thenReturn(Some(activeController))

Review comment:
       We can probably simplify the test a little bit. We shouldn't need to provide a controller if the request never gets enqueued.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801997350


   Thanks @mumrah.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r596364783



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -168,7 +168,7 @@ class BrokerToControllerChannelManagerImpl(
   threadNamePrefix: Option[String],
   retryTimeoutMs: Long
 ) extends BrokerToControllerChannelManager with Logging {
-  private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")

Review comment:
       Shall we use this in defining the thread name passed to `BrokerToControllerRequestThread` as well? The thread name is used for the `logIdent` in `ShutdownableThread`.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -304,6 +304,9 @@ class BrokerToControllerRequestThread(
   }
 
   def enqueue(request: BrokerToControllerQueueItem): Unit = {
+    if (!this.isAlive) {

Review comment:
       Is it worth adding a simple test case for this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r597014831



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -295,6 +295,9 @@ class BrokerToControllerRequestThread(
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
 
+  // Used for testing
+  private[server] var started = false

Review comment:
       Does it need to be thread-safe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #10340:
URL: https://github.com/apache/kafka/pull/10340


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801992943


   @ijuma no this only affects trunk. It looks like the bug was introduced only a few days ago


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r597106362



##########
File path: core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
##########
@@ -319,12 +325,40 @@ class BrokerToControllerRequestThreadTest {
 
     val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
+    testRequestThread.started = true
 
     testRequestThread.enqueue(queueItem)
     pollUntil(testRequestThread, () => callbackResponse.get != null)
     assertNotNull(callbackResponse.get.authenticationException)
   }
 
+  @Test
+  def testThreadNotStarted(): Unit = {
+    // Make sure we throw if we enqueue anything while the thread is not running
+    val time = new MockTime()
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+
+    val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))

Review comment:
       I don't think we need this either since the request never gets sent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org