You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/18 10:31:28 UTC

[kafka] branch 1.0 updated: KAFKA-6772: Load credentials from ZK before accepting connections (#4867)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 1dc89ff  KAFKA-6772: Load credentials from ZK before accepting connections (#4867)
1dc89ff is described below

commit 1dc89ff7df96de7a4b1c009786a44123e1d5b4aa
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Apr 18 10:20:28 2018 +0100

    KAFKA-6772: Load credentials from ZK before accepting connections (#4867)
    
    Start processing client connections only after completing KafkaServer initialization to ensure that credentials are loaded from ZK into cache before authentications are processed. Acceptors are started earlier so that bound port is known for registering in ZK.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 .../main/scala/kafka/network/SocketServer.scala    | 36 +++++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  6 +-
 .../kafka/server/ScramServerStartupTest.scala      | 72 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4366fea..1d1bbed 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -75,9 +75,18 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private var stoppedProcessingRequests = false
 
   /**
-   * Start the socket server
+   * Start the socket server. Acceptors for all the listeners are started. Processors
+   * are started if `startupProcessors` is true. If not, processors are only started when
+   * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors
+   * is used to delay processing client connections until server is fully initialized, e.g.
+   * to ensure that all credentials have been loaded before authentications are performed.
+   * Acceptors are always started during `startup` so that the bound port is known when this
+   * method completes even when ephemeral ports are used. Incoming connections on this server
+   * are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
+   *
+   * @param startupProcessors Flag indicating whether `Processor`s must be started.
    */
-  def startup() {
+  def startup(startupProcessors: Boolean = true) {
     this.synchronized {
 
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
@@ -103,6 +112,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
         processorBeginIndex = processorEndIndex
       }
+      if (startupProcessors) {
+        startProcessors()
+      }
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -131,6 +143,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   // register the processor threads for notification of responses
   requestChannel.addResponseListener(id => processors(id).wakeup())
+  /**
+   * Starts processors of all the acceptors of this server if they have not already been started.
+   * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startProcessors(): Unit = synchronized {
+    acceptors.values.foreach { _.startProcessors() }
+    info(s"Started processors for ${acceptors.size} acceptors")
+  }
 
   /**
     * Stop processing requests and new connections.
@@ -270,11 +291,14 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
+  private val processorsStarted = new AtomicBoolean
 
-  this.synchronized {
-    processors.foreach { processor =>
-      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
-        processor).start()
+  private[network] def startProcessors(): Unit = synchronized {
+    if (!processorsStarted.getAndSet(true)) {
+      processors.foreach { processor =>
+        KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+          processor).start()
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index cfdce2a..f0f09e7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -225,8 +225,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         metadataCache = new MetadataCache(config.brokerId)
         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
 
+        // Create and start the socket server acceptor threads so that the bound port is known.
+        // Delay starting processors until the end of the initialization sequence to ensure
+        // that credentials have been loaded before processing authentications.
         socketServer = new SocketServer(config, metrics, time, credentialProvider)
-        socketServer.startup()
+        socketServer.startup(startupProcessors = false)
 
         /* start replica manager */
         replicaManager = createReplicaManager(isShuttingDown)
@@ -289,6 +292,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
         checkpointBrokerId(config.brokerId)
 
+        socketServer.startProcessors()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
new file mode 100644
index 0000000..317fb7f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -0,0 +1,72 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+import java.util.Collections
+
+import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
+import kafka.utils._
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that there are no failed authentications during broker startup. This is to verify
+ * that SCRAM credentials are loaded by brokers before client connections can be made.
+ * For simplicity of testing, this test verifies authentications of controller connections.
+ */
+class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
+
+  override val producerCount = 0
+  override val consumerCount = 0
+  override val serverCount = 1
+
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala
+
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+    // Create credentials before starting brokers
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl))
+  }
+
+  @Test
+  def testAuthentications(): Unit = {
+    val successfulAuths = totalAuthentications("successful-authentication-total")
+    assertTrue("No successful authentications", successfulAuths > 0)
+    val failedAuths = totalAuthentications("failed-authentication-total")
+    assertEquals(0, failedAuths)
+  }
+
+  private def totalAuthentications(metricName: String): Int = {
+    val allMetrics = servers.head.metrics.metrics
+    val totalAuthCount = allMetrics.values().asScala.filter(_.metricName().name() == metricName)
+      .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double])
+    totalAuthCount.toInt
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.