You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/09 10:19:21 UTC

[kafka] branch 3.2 updated: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 1180f5e10ca KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)
1180f5e10ca is described below

commit 1180f5e10ca21652e55f83a6b64de6672a4f0a89
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Sat Jul 9 12:06:02 2022 +0200

    KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)
    
    What:
    When a certificate is rotated on a broker via dynamic configuration and the previous certificate expires, the broker to controller connection starts failing with SSL Handshake failed.
    
    Why:
    A similar fix was earlier performed in #6721 but when BrokerToControllerChannelManager was introduced in v2.7, we didn't enable dynamic reconfiguration for it's channel.
    
    Summary of testing strategy (including rationale)
    Add a test which fails prior to the fix done in the PR and succeeds afterwards. The bug wasn't caught earlier because there was no test coverage to validate the scenario.
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../server/BrokerToControllerChannelManager.scala  |  7 ++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 28 +++++++++++++---
 .../BrokerToControllerRequestThreadTest.scala      | 37 ++++++----------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 24 ++++++++++++--
 4 files changed, 60 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index b671c700ac6..7ef04f0b8ec 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -19,12 +19,11 @@ package kafka.server
 
 import java.util.concurrent.LinkedBlockingDeque
 import java.util.concurrent.atomic.AtomicReference
-
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
 import kafka.raft.RaftManager
 import kafka.utils.Logging
 import org.apache.kafka.clients._
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Node, Reconfigurable}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
@@ -188,6 +187,10 @@ class BrokerToControllerChannelManagerImpl(
         config.saslInterBrokerHandshakeRequestEnable,
         logContext
       )
+      channelBuilder match {
+        case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
+        case _ =>
+      }
       val selector = new Selector(
         NetworkReceive.UNLIMITED,
         Selector.NO_IDLE_TIMEOUT_MS,
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index dee4f01da24..dfff9075f8e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,7 +26,6 @@ import java.time.Duration
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent._
-
 import javax.management.ObjectName
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
@@ -36,9 +35,9 @@ import kafka.log.{CleanerConfig, LogConfig}
 import kafka.message.ProducerCompressionCodec
 import kafka.metrics.KafkaYammerMetrics
 import kafka.network.{Processor, RequestChannel}
-import kafka.server.QuorumTestHarness
 import kafka.utils._
 import kafka.utils.Implicits._
+import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -53,11 +52,12 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.provider.FileConfigProvider
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.Quota
-import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter, Quota}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.requests.MetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
@@ -430,6 +430,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       verifyProduceConsume(producer, consumer, 10, topic)
     }
 
+    def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
+      val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get
+      val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager
+      val completionHandler = new TestControllerRequestCompletionHandler()
+      brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler)
+      TestUtils.waitUntilTrue(() => {
+        completionHandler.completed.get() || completionHandler.timedOut.get()
+      }, "Timed out while waiting for broker to controller API call")
+      // we do not expect a timeout from broker to controller request
+      assertFalse(completionHandler.timedOut.get(), "broker to controller request is timeout")
+      assertTrue(completionHandler.actualResponse.isDefined, "No response recorded even though request is completed")
+      val response = completionHandler.actualResponse.get
+      assertNull(response.authenticationException(), s"Request failed due to authentication error ${response.authenticationException}")
+      assertNull(response.versionMismatch(), s"Request failed due to unsupported version error ${response.versionMismatch}")
+      assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
+    }
+
     // Produce/consume should work with old as well as new client keystore
     verifySslProduceConsume(sslProperties1, "alter-truststore-1")
     verifySslProduceConsume(sslProperties2, "alter-truststore-2")
@@ -470,6 +487,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
     brokerStateInfo(0).networkClient.disconnect("0")
     TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+    // validate that the brokerToController request works fine
+    verifyBrokerToControllerCall(controller)
   }
 
   @Test
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index 3297ec01ece..bee1aefaca2 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -19,13 +19,14 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.util.Collections
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
 import kafka.utils.TestUtils
+import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
 import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient, NodeApiVersions}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.message.{EnvelopeResponseData, MetadataRequestData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, RequestTestUtils}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.MockTime
@@ -51,7 +52,7 @@ class BrokerToControllerRequestThreadTest {
       config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
-    val completionHandler = new TestRequestCompletionHandler(None)
+    val completionHandler = new TestControllerRequestCompletionHandler(None)
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
@@ -89,7 +90,7 @@ class BrokerToControllerRequestThreadTest {
     testRequestThread.started = true
     mockClient.prepareResponse(expectedResponse)
 
-    val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+    val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
@@ -130,7 +131,7 @@ class BrokerToControllerRequestThreadTest {
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
-    val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+    val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
@@ -180,7 +181,7 @@ class BrokerToControllerRequestThreadTest {
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
-    val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+    val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
@@ -243,7 +244,7 @@ class BrokerToControllerRequestThreadTest {
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
-    val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+    val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
     val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "principal", true)
     val kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
 
@@ -305,7 +306,7 @@ class BrokerToControllerRequestThreadTest {
       config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
-    val completionHandler = new TestRequestCompletionHandler()
+    val completionHandler = new TestControllerRequestCompletionHandler()
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
@@ -419,7 +420,7 @@ class BrokerToControllerRequestThreadTest {
     val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
 
-    val completionHandler = new TestRequestCompletionHandler(None)
+    val completionHandler = new TestControllerRequestCompletionHandler(None)
     val queueItem = BrokerToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
@@ -445,22 +446,4 @@ class BrokerToControllerRequestThreadTest {
       fail(s"Condition failed to be met after polling $tries times")
     }
   }
-
-  class TestRequestCompletionHandler(
-    expectedResponse: Option[MetadataResponse] = None
-  ) extends ControllerRequestCompletionHandler {
-    val completed: AtomicBoolean = new AtomicBoolean(false)
-    val timedOut: AtomicBoolean = new AtomicBoolean(false)
-
-    override def onComplete(response: ClientResponse): Unit = {
-      expectedResponse.foreach { expected =>
-        assertEquals(expected, response.responseBody())
-      }
-      completed.set(true)
-    }
-
-    override def onTimeout(): Unit = {
-      timedOut.set(true)
-    }
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 37819d29618..8d921e54f12 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -42,7 +42,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.utils.Implicits._
 import kafka.zk._
-import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
@@ -61,7 +61,7 @@ import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, RequestContext, RequestHeader}
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
@@ -2180,4 +2180,22 @@ object TestUtils extends Logging {
         s"${unexpected.mkString("`", ",", "`")}")
   }
 
-}
+  class TestControllerRequestCompletionHandler(expectedResponse: Option[AbstractResponse] = None)
+    extends ControllerRequestCompletionHandler {
+    var actualResponse: Option[ClientResponse] = Option.empty
+    val completed: AtomicBoolean = new AtomicBoolean(false)
+    val timedOut: AtomicBoolean = new AtomicBoolean(false)
+
+    override def onComplete(response: ClientResponse): Unit = {
+      actualResponse = Some(response)
+      expectedResponse.foreach { expected =>
+        assertEquals(expected, response.responseBody())
+      }
+      completed.set(true)
+    }
+
+    override def onTimeout(): Unit = {
+      timedOut.set(true)
+    }
+  }
+}
\ No newline at end of file