You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/15 08:17:32 UTC

[kafka] branch trunk updated: KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 050fdd6  KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721)
050fdd6 is described below

commit 050fdd6537fcfb640277ff5e1607e44d18c0d95f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed May 15 09:17:10 2019 +0100

    KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721)
    
    Enable reconfiguration of SSL keystores and truststores in client-side channel builders used by brokers for controller, transaction coordinator and replica fetchers. This enables brokers using TLS mutual authentication for inter-broker listener to use short-lived certs that may be updated before expiry without restarting brokers.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/common/security/ssl/SslFactory.java      |  2 +-
 .../test/java/org/apache/kafka/test/TestUtils.java | 12 ++++++++
 .../controller/ControllerChannelManager.scala      | 19 ++++++++----
 .../TransactionMarkerChannelManager.scala          |  6 +++-
 .../kafka/server/ReplicaFetcherBlockingSend.scala  | 14 +++++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 35 ++++++++++++++++++++--
 6 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 73d9210..c31d3e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -175,7 +175,7 @@ public class SslFactory implements Reconfigurable {
                 SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
                 SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
                 this.sslContext = createSSLContext(keystore, truststore);
-                log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
+                log.info("Created new {} SSL context with keystore {} truststore {}", mode, keystore, truststore);
                 this.keystore = keystore;
                 this.truststore = truststore;
             } catch (Exception e) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 58d900d..250ebc0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -464,4 +465,15 @@ public class TestUtils {
             fail("Missing value from Optional");
         }
     }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T fieldValue(Object o, Class<?> clazz, String fieldName)  {
+        try {
+            Field field = clazz.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            return (T) field.get(o);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 92aa421..3c46036 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
@@ -116,7 +116,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
     val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
     val brokerNode = broker.node(controllerToBrokerListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
-    val networkClient = {
+    val (networkClient, reconfigurableChannelBuilder) = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
         controllerToBrokerSecurityProtocol,
         JaasContext.Type.SERVER,
@@ -126,6 +126,12 @@ class ControllerChannelManager(controllerContext: ControllerContext,
         time,
         config.saslInterBrokerHandshakeRequestEnable
       )
+      val reconfigurableChannelBuilder = channelBuilder match {
+        case reconfigurable: Reconfigurable =>
+          config.addReconfigurable(reconfigurable)
+          Some(reconfigurable)
+        case _ => None
+      }
       val selector = new Selector(
         NetworkReceive.UNLIMITED,
         Selector.NO_IDLE_TIMEOUT_MS,
@@ -137,7 +143,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
         channelBuilder,
         logContext
       )
-      new NetworkClient(
+      val networkClient = new NetworkClient(
         selector,
         new ManualMetadataUpdater(Seq(brokerNode).asJava),
         config.brokerId.toString,
@@ -153,6 +159,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
         new ApiVersions,
         logContext
       )
+      (networkClient, reconfigurableChannelBuilder)
     }
     val threadName = threadNamePrefix match {
       case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
@@ -176,7 +183,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
     )
 
     brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
-      requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
+      requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
   }
 
   private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
@@ -187,6 +194,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
       // non-threadsafe classes as described in KAFKA-4959.
       // The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization barrier that
       // hands off the NetworkClient from the RequestSendThread to the ZkEventThread.
+      brokerState.reconfigurableChannelBuilder.foreach(config.removeReconfigurable)
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
@@ -573,5 +581,6 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread,
                                      queueSizeGauge: Gauge[Int],
-                                     requestRateAndTimeMetrics: Timer)
+                                     requestRateAndTimeMetrics: Timer,
+                                     reconfigurableChannelBuilder: Option[Reconfigurable])
 
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index f49fa7b..436ea2e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients._
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
@@ -54,6 +54,10 @@ object TransactionMarkerChannelManager {
       time,
       config.saslInterBrokerHandshakeRequestEnable
     )
+    channelBuilder match {
+      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
+      case _ =>
+    }
     val selector = new Selector(
       NetworkReceive.UNLIMITED,
       config.connectionsMaxIdleMs,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 924111c..8e631fe 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Node, Reconfigurable}
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 
 import scala.collection.JavaConverters._
@@ -51,7 +51,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
   private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
   private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
 
-  private val networkClient = {
+  private val (networkClient, reconfigurableChannelBuilder) = {
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       brokerConfig.interBrokerSecurityProtocol,
       JaasContext.Type.SERVER,
@@ -61,6 +61,12 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       time,
       brokerConfig.saslInterBrokerHandshakeRequestEnable
     )
+    val reconfigurableChannelBuilder = channelBuilder match {
+      case reconfigurable: Reconfigurable =>
+        brokerConfig.addReconfigurable(reconfigurable)
+        Some(reconfigurable)
+      case _ => None
+    }
     val selector = new Selector(
       NetworkReceive.UNLIMITED,
       brokerConfig.connectionsMaxIdleMs,
@@ -72,7 +78,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       channelBuilder,
       logContext
     )
-    new NetworkClient(
+    val networkClient = new NetworkClient(
       selector,
       new ManualMetadataUpdater(),
       clientId,
@@ -88,6 +94,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       new ApiVersions,
       logContext
     )
+    (networkClient, reconfigurableChannelBuilder)
   }
 
   override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
@@ -108,6 +115,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
   }
 
   override def initiateClose(): Unit = {
+    reconfigurableChannelBuilder.foreach(brokerConfig.removeReconfigurable)
     networkClient.initiateClose()
   }
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index ed6638c..f08450a 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,12 +26,13 @@ import java.time.Duration
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent._
-
 import javax.management.ObjectName
+
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.network.{Processor, RequestChannel}
@@ -56,7 +57,7 @@ import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PR
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
-import org.apache.kafka.test.TestSslUtils
+import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.junit.Assert._
 import org.junit.{After, Before, Ignore, Test}
 import org.scalatest.Assertions.intercept
@@ -334,6 +335,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
     verifySslProduceConsume(sslProperties1, "alter-truststore-4")
     verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+
+    // Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
+    // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
+    // and verify that metadata is propagated for new topic.
+    val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
+    props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
+    verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+    props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
+    verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+    waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
+
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controllerChannelManager = controller.kafkaController.controllerChannelManager
+    val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+      JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+    brokerStateInfo(0).networkClient.disconnect("0")
+    TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
   }
 
   @Test
@@ -1078,6 +1098,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
   }
 
+  private def waitForAuthenticationFailure(producerBuilder: ProducerBuilder): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      try {
+        verifyAuthenticationFailure(producerBuilder.build())
+        true
+      } catch {
+        case e: Error => false
+      }
+    }, "Did not fail authentication with invalid config")
+  }
+
   private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)