You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/09 10:06:23 UTC
[kafka] branch trunk updated: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc6e91e1992 KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)
fc6e91e1992 is described below
commit fc6e91e19920a41a56ff60c65e3b9719f4506977
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Sat Jul 9 12:06:02 2022 +0200
KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (#12381)
What:
When a certificate is rotated on a broker via dynamic configuration and the previous certificate expires, the broker to controller connection starts failing with SSL Handshake failed.
Why:
A similar fix was earlier performed in #6721 but when BrokerToControllerChannelManager was introduced in v2.7, we didn't enable dynamic reconfiguration for it's channel.
Summary of testing strategy (including rationale)
Add a test which fails prior to the fix done in the PR and succeeds afterwards. The bug wasn't caught earlier because there was no test coverage to validate the scenario.
Reviewers: Luke Chen <sh...@gmail.com>
---
.../server/BrokerToControllerChannelManager.scala | 7 ++--
.../server/DynamicBrokerReconfigurationTest.scala | 25 +++++++++++++--
.../BrokerToControllerRequestThreadTest.scala | 37 ++++++----------------
.../test/scala/unit/kafka/utils/TestUtils.scala | 24 ++++++++++++--
4 files changed, 59 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 37f3a47e291..a4879798342 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -19,12 +19,11 @@ package kafka.server
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
-
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
@@ -188,6 +187,10 @@ class BrokerToControllerChannelManagerImpl(
config.saslInterBrokerHandshakeRequestEnable,
logContext
)
+ channelBuilder match {
+ case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
+ case _ =>
+ }
val selector = new Selector(
NetworkReceive.UNLIMITED,
Selector.NO_IDLE_TIMEOUT_MS,
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index d4294717101..ccfe63e7b56 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,7 +26,6 @@ import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._
-
import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
@@ -35,9 +34,9 @@ import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.{CleanerConfig, LogConfig}
import kafka.message.ProducerCompressionCodec
import kafka.network.{Processor, RequestChannel}
-import kafka.server.QuorumTestHarness
import kafka.utils._
import kafka.utils.Implicits._
+import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -52,10 +51,12 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.provider.FileConfigProvider
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
@@ -429,6 +430,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyProduceConsume(producer, consumer, 10, topic)
}
+ def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
+ val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get
+ val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager
+ val completionHandler = new TestControllerRequestCompletionHandler()
+ brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler)
+ TestUtils.waitUntilTrue(() => {
+ completionHandler.completed.get() || completionHandler.timedOut.get()
+ }, "Timed out while waiting for broker to controller API call")
+ // we do not expect a timeout from broker to controller request
+ assertFalse(completionHandler.timedOut.get(), "broker to controller request is timeout")
+ assertTrue(completionHandler.actualResponse.isDefined, "No response recorded even though request is completed")
+ val response = completionHandler.actualResponse.get
+ assertNull(response.authenticationException(), s"Request failed due to authentication error ${response.authenticationException}")
+ assertNull(response.versionMismatch(), s"Request failed due to unsupported version error ${response.versionMismatch}")
+ assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
+ }
+
// Produce/consume should work with old as well as new client keystore
verifySslProduceConsume(sslProperties1, "alter-truststore-1")
verifySslProduceConsume(sslProperties2, "alter-truststore-2")
@@ -469,6 +487,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+ // validate that the brokerToController request works fine
+ verifyBrokerToControllerCall(controller)
}
@Test
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index 3297ec01ece..bee1aefaca2 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -19,13 +19,14 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.Collections
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
import kafka.utils.TestUtils
+import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient, NodeApiVersions}
import org.apache.kafka.common.Node
import org.apache.kafka.common.message.{EnvelopeResponseData, MetadataRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, RequestTestUtils}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
@@ -51,7 +52,7 @@ class BrokerToControllerRequestThreadTest {
config, time, "", retryTimeoutMs)
testRequestThread.started = true
- val completionHandler = new TestRequestCompletionHandler(None)
+ val completionHandler = new TestControllerRequestCompletionHandler(None)
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
@@ -89,7 +90,7 @@ class BrokerToControllerRequestThreadTest {
testRequestThread.started = true
mockClient.prepareResponse(expectedResponse)
- val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+ val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
@@ -130,7 +131,7 @@ class BrokerToControllerRequestThreadTest {
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
- val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+ val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
@@ -180,7 +181,7 @@ class BrokerToControllerRequestThreadTest {
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
- val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+ val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
@@ -243,7 +244,7 @@ class BrokerToControllerRequestThreadTest {
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
- val completionHandler = new TestRequestCompletionHandler(Some(expectedResponse))
+ val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "principal", true)
val kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
@@ -305,7 +306,7 @@ class BrokerToControllerRequestThreadTest {
config, time, "", retryTimeoutMs)
testRequestThread.started = true
- val completionHandler = new TestRequestCompletionHandler()
+ val completionHandler = new TestControllerRequestCompletionHandler()
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
@@ -419,7 +420,7 @@ class BrokerToControllerRequestThreadTest {
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
- val completionHandler = new TestRequestCompletionHandler(None)
+ val completionHandler = new TestControllerRequestCompletionHandler(None)
val queueItem = BrokerToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
@@ -445,22 +446,4 @@ class BrokerToControllerRequestThreadTest {
fail(s"Condition failed to be met after polling $tries times")
}
}
-
- class TestRequestCompletionHandler(
- expectedResponse: Option[MetadataResponse] = None
- ) extends ControllerRequestCompletionHandler {
- val completed: AtomicBoolean = new AtomicBoolean(false)
- val timedOut: AtomicBoolean = new AtomicBoolean(false)
-
- override def onComplete(response: ClientResponse): Unit = {
- expectedResponse.foreach { expected =>
- assertEquals(expected, response.responseBody())
- }
- completed.set(true)
- }
-
- override def onTimeout(): Unit = {
- timedOut.set(true)
- }
- }
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 01a888c1667..5a8d43795ae 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -41,7 +41,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils.Implicits._
import kafka.zk._
-import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
@@ -61,7 +61,7 @@ import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
@@ -2242,4 +2242,22 @@ object TestUtils extends Logging {
s"${unexpected.mkString("`", ",", "`")}")
}
-}
+ class TestControllerRequestCompletionHandler(expectedResponse: Option[AbstractResponse] = None)
+ extends ControllerRequestCompletionHandler {
+ var actualResponse: Option[ClientResponse] = Option.empty
+ val completed: AtomicBoolean = new AtomicBoolean(false)
+ val timedOut: AtomicBoolean = new AtomicBoolean(false)
+
+ override def onComplete(response: ClientResponse): Unit = {
+ actualResponse = Some(response)
+ expectedResponse.foreach { expected =>
+ assertEquals(expected, response.responseBody())
+ }
+ completed.set(true)
+ }
+
+ override def onTimeout(): Unit = {
+ timedOut.set(true)
+ }
+ }
+}
\ No newline at end of file