You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/15 08:17:32 UTC
[kafka] branch trunk updated: KAFKA-8336;
Enable dynamic reconfiguration of broker's client-side certs (#6721)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 050fdd6 KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721)
050fdd6 is described below
commit 050fdd6537fcfb640277ff5e1607e44d18c0d95f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed May 15 09:17:10 2019 +0100
KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721)
Enable reconfiguration of SSL keystores and truststores in client-side channel builders used by brokers for controller, transaction coordinator and replica fetchers. This enables brokers using TLS mutual authentication for inter-broker listener to use short-lived certs that may be updated before expiry without restarting brokers.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../kafka/common/security/ssl/SslFactory.java | 2 +-
.../test/java/org/apache/kafka/test/TestUtils.java | 12 ++++++++
.../controller/ControllerChannelManager.scala | 19 ++++++++----
.../TransactionMarkerChannelManager.scala | 6 +++-
.../kafka/server/ReplicaFetcherBlockingSend.scala | 14 +++++++--
.../server/DynamicBrokerReconfigurationTest.scala | 35 ++++++++++++++++++++--
6 files changed, 76 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 73d9210..c31d3e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -175,7 +175,7 @@ public class SslFactory implements Reconfigurable {
SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
this.sslContext = createSSLContext(keystore, truststore);
- log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
+ log.info("Created new {} SSL context with keystore {} truststore {}", mode, keystore, truststore);
this.keystore = keystore;
this.truststore = truststore;
} catch (Exception e) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 58d900d..250ebc0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -464,4 +465,15 @@ public class TestUtils {
fail("Missing value from Optional");
}
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T fieldValue(Object o, Class<?> clazz, String fieldName) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(o);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 92aa421..3c46036 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
@@ -116,7 +116,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
- val networkClient = {
+ val (networkClient, reconfigurableChannelBuilder) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerToBrokerSecurityProtocol,
JaasContext.Type.SERVER,
@@ -126,6 +126,12 @@ class ControllerChannelManager(controllerContext: ControllerContext,
time,
config.saslInterBrokerHandshakeRequestEnable
)
+ val reconfigurableChannelBuilder = channelBuilder match {
+ case reconfigurable: Reconfigurable =>
+ config.addReconfigurable(reconfigurable)
+ Some(reconfigurable)
+ case _ => None
+ }
val selector = new Selector(
NetworkReceive.UNLIMITED,
Selector.NO_IDLE_TIMEOUT_MS,
@@ -137,7 +143,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
channelBuilder,
logContext
)
- new NetworkClient(
+ val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
@@ -153,6 +159,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
new ApiVersions,
logContext
)
+ (networkClient, reconfigurableChannelBuilder)
}
val threadName = threadNamePrefix match {
case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
@@ -176,7 +183,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
)
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
- requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
+ requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
}
private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
@@ -187,6 +194,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
// non-threadsafe classes as described in KAFKA-4959.
// The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization barrier that
// hands off the NetworkClient from the RequestSendThread to the ZkEventThread.
+ brokerState.reconfigurableChannelBuilder.foreach(config.removeReconfigurable)
brokerState.requestSendThread.shutdown()
brokerState.networkClient.close()
brokerState.messageQueue.clear()
@@ -573,5 +581,6 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread,
queueSizeGauge: Gauge[Int],
- requestRateAndTimeMetrics: Timer)
+ requestRateAndTimeMetrics: Timer,
+ reconfigurableChannelBuilder: Option[Reconfigurable])
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index f49fa7b..436ea2e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
@@ -54,6 +54,10 @@ object TransactionMarkerChannelManager {
time,
config.saslInterBrokerHandshakeRequestEnable
)
+ channelBuilder match {
+ case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
+ case _ =>
+ }
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 924111c..8e631fe 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import scala.collection.JavaConverters._
@@ -51,7 +51,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
- private val networkClient = {
+ private val (networkClient, reconfigurableChannelBuilder) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
brokerConfig.interBrokerSecurityProtocol,
JaasContext.Type.SERVER,
@@ -61,6 +61,12 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
time,
brokerConfig.saslInterBrokerHandshakeRequestEnable
)
+ val reconfigurableChannelBuilder = channelBuilder match {
+ case reconfigurable: Reconfigurable =>
+ brokerConfig.addReconfigurable(reconfigurable)
+ Some(reconfigurable)
+ case _ => None
+ }
val selector = new Selector(
NetworkReceive.UNLIMITED,
brokerConfig.connectionsMaxIdleMs,
@@ -72,7 +78,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
channelBuilder,
logContext
)
- new NetworkClient(
+ val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(),
clientId,
@@ -88,6 +94,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
new ApiVersions,
logContext
)
+ (networkClient, reconfigurableChannelBuilder)
}
override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
@@ -108,6 +115,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
}
override def initiateClose(): Unit = {
+ reconfigurableChannelBuilder.foreach(brokerConfig.removeReconfigurable)
networkClient.initiateClose()
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index ed6638c..f08450a 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,12 +26,13 @@ import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._
-
import javax.management.ObjectName
+
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.network.{Processor, RequestChannel}
@@ -56,7 +57,7 @@ import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PR
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
-import org.apache.kafka.test.TestSslUtils
+import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept
@@ -334,6 +335,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+
+ // Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
+ // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
+ // and verify that metadata is propagated for new topic.
+ val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
+ props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
+ TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
+ verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+ props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
+ TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
+ verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+ waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
+
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+ val controllerChannelManager = controller.kafkaController.controllerChannelManager
+ val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+ JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+ brokerStateInfo(0).networkClient.disconnect("0")
+ TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
}
@Test
@@ -1078,6 +1098,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
+ private def waitForAuthenticationFailure(producerBuilder: ProducerBuilder): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ try {
+ verifyAuthenticationFailure(producerBuilder.build())
+ true
+ } catch {
+ case e: Error => false
+ }
+ }, "Did not fail authentication with invalid config")
+ }
+
private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)