You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/11/08 21:28:50 UTC

[kafka] branch trunk updated: KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb3335e  KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575)
eb3335e is described below

commit eb3335ef592a3dd22895a5bb499a2d0b232227a7
Author: Jonathan Santilli <jo...@users.noreply.github.com>
AuthorDate: Thu Nov 8 21:28:37 2018 +0000

    KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575)
    
    * Add logic to retry the BrokerInfo registration into ZooKeeper
    
    In case the ZooKeeper session has been regenerated and the broker
    tries to register the BrokerInfo into Zookeeper, this code deletes
    the current BrokerInfo from Zookeeper and creates it again, just if
    the znode ephemeral owner belongs to the Broker which tries to register
    himself again into ZooKeeper
    
    * Add test to validate the BrokerInfo re-registration into ZooKeeper
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 88 ++++++++++++++++++++--
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 60 +++++++++++++++
 2 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index a12abb4..4ad40ef 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -53,7 +53,7 @@ import scala.collection.JavaConverters._
  * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
  * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
  */
-class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
+class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
   Logging with KafkaMetricsGroup {
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
@@ -67,6 +67,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   // Only for testing
   private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper
 
+  // This variable holds the Zookeeper session id at the moment a Broker gets registered in Zookeeper and the subsequent
+  // updates of the session id. It is possible that the session id changes over the time for 'Session expired'.
+  // This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
+  // be deleted.
+  private var currentZooKeeperSessionId: Long = -1
+
   /**
    * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect
    * and a monotonically increasing number will be appended to its name.
@@ -1585,7 +1591,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
 
-  private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
+  private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
     retryRequestsUntilConnected(Seq(request)).head
   }
 
@@ -1631,26 +1637,94 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
       throw KeeperException.create(code)
   }
 
+  private def isZKSessionIdDiffFromCurrentZKSessionId(): Boolean = {
+    zooKeeperClient.sessionId != currentZooKeeperSessionId
+  }
+
+  private def isZKSessionTheEphemeralOwner(ephemeralOwnerId: Long): Boolean = {
+    ephemeralOwnerId == currentZooKeeperSessionId
+  }
+
+  private[zk] def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = {
+    isZKSessionTheEphemeralOwner(ephemeralOwnerId) && isZKSessionIdDiffFromCurrentZKSessionId()
+  }
+
+  private def updateCurrentZKSessionId(newSessionId: Long): Unit = {
+    currentZooKeeperSessionId = newSessionId
+  }
+
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
     def create(): Code = {
       val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
       val createResponse = retryRequestUntilConnected(createRequest)
-      createResponse.resultCode match {
-        case code@ Code.OK => code
-        case Code.NODEEXISTS => getAfterNodeExists()
+      val createResultCode = createResponse.resultCode match {
+        case code@ Code.OK =>
+          code
+        case Code.NODEEXISTS =>
+          getAfterNodeExists()
         case code =>
           error(s"Error while creating ephemeral at $path with return code: $code")
           code
       }
+
+      if (createResultCode == Code.OK) {
+        // At this point, we need to save a reference to the zookeeper session id.
+        // This is done here since the Zookeeper session id may not be available at the Object creation time.
+        // This is assuming the 'retryRequestUntilConnected' method got connected and a valid session id is present.
+        // This code is part of the workaround done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code
+        // must be deleted.
+        updateCurrentZKSessionId(zooKeeperClient.sessionId)
+      }
+
+      createResultCode
+    }
+
+    // This method is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
+    // be deleted.
+    private def delete(): Code = {
+      val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion)
+      val deleteResponse = retryRequestUntilConnected(deleteRequest)
+      deleteResponse.resultCode match {
+        case code@ Code.OK => code
+        case code@ Code.NONODE => code
+        case code =>
+          error(s"Error while deleting ephemeral node at $path with return code: $code")
+          code
+      }
+    }
+
+    private def reCreate(): Code = {
+      val codeAfterDelete = delete()
+      var codeAfterReCreate = codeAfterDelete
+      debug(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete")
+      if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE) {
+        codeAfterReCreate = create()
+        debug(s"Result of znode ephemeral re-creation at $path is: $codeAfterReCreate")
+      }
+      codeAfterReCreate
     }
 
     private def getAfterNodeExists(): Code = {
       val getDataRequest = GetDataRequest(path)
       val getDataResponse = retryRequestUntilConnected(getDataRequest)
+      val ephemeralOwnerId = getDataResponse.stat.getEphemeralOwner
       getDataResponse.resultCode match {
-        case Code.OK if getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId =>
+        // At this point, the Zookeeper session could be different (due a 'Session expired') from the one that initially
+        // registered the Broker into the Zookeeper ephemeral node, but the znode is still present in ZooKeeper.
+        // The expected behaviour is that Zookeeper server removes the ephemeral node associated with the expired session
+        // but due an already reported bug in Zookeeper (ZOOKEEPER-2985) this is not happening, so, the following check
+        // will validate if this Broker got registered with the previous (expired) session and try to register again,
+        // deleting the ephemeral node and creating it again.
+        // This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
+        // be deleted.
+        case Code.OK if shouldReCreateEphemeralZNode(ephemeralOwnerId) =>
+          info(s"Was not possible to create the ephemeral at $path, node already exists and owner " +
+            s"'$ephemeralOwnerId' does not match current session '${zooKeeperClient.sessionId}'" +
+            s", trying to delete and re-create it with the newest Zookeeper session")
+          reCreate()
+        case Code.OK if ephemeralOwnerId != zooKeeperClient.sessionId =>
           error(s"Error while creating ephemeral at $path, node already exists and owner " +
-            s"'${getDataResponse.stat.getEphemeralOwner}' does not match current session '${zooKeeperClient.sessionId}'")
+            s"'$ephemeralOwnerId' does not match current session '${zooKeeperClient.sessionId}'")
           Code.NODEEXISTS
         case code@ Code.OK => code
         case Code.NONODE =>
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 61ca3bb..6de9159 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,6 +59,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val controllerEpochZkVersion = 0
 
   var otherZkClient: KafkaZkClient = _
+  var expiredSessionZkClient: ExpiredKafkaZkClient = _
 
   @Before
   override def setUp(): Unit = {
@@ -66,6 +67,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createControllerEpochRaw(1)
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
+      zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
 
   @After
@@ -73,6 +76,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     if (otherZkClient != null)
       otherZkClient.close()
     zkClient.deletePath(ControllerEpochZNode.path)
+    if (expiredSessionZkClient != null)
+      expiredSessionZkClient.close()
     super.tearDown()
   }
 
@@ -684,6 +689,31 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testRetryRegisterBrokerInfo(): Unit = {
+    val brokerId = 5
+    val brokerPort = 9999
+    val brokerHost = "test.host"
+    val expiredBrokerInfo = createBrokerInfo(brokerId, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT)
+    expiredSessionZkClient.createTopLevelPaths()
+
+    // Register the broker, for the first time
+    expiredSessionZkClient.registerBroker(expiredBrokerInfo)
+    assertEquals(Some(expiredBrokerInfo.broker), expiredSessionZkClient.getBroker(brokerId))
+    val originalCzxid = expiredSessionZkClient.getPathCzxid(BrokerIdZNode.path(brokerId))
+
+    // Here, the node exists already, when trying to register under a different session id,
+    // the node will be deleted and created again using the new session id.
+    expiredSessionZkClient.registerBroker(expiredBrokerInfo)
+
+    // The broker info should be the same, no error should be raised
+    assertEquals(Some(expiredBrokerInfo.broker), expiredSessionZkClient.getBroker(brokerId))
+    val newCzxid = expiredSessionZkClient.getPathCzxid(BrokerIdZNode.path(brokerId))
+
+    assertNotEquals("The Czxid of original ephemeral znode should be different " +
+      "from the new ephemeral znode Czxid", originalCzxid, newCzxid)
+  }
+
+  @Test
   def testGetBrokerMethods(): Unit = {
     zkClient.createTopLevelPaths()
 
@@ -1112,4 +1142,34 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertEquals(expectedConsumerGroupOffsetsPath, actualConsumerGroupOffsetsPath)
   }
+
+  class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
+    extends KafkaZkClient(zooKeeperClient, isSecure, time) {
+    // Overwriting this method from the parent class to force the client to re-register the Broker.
+    override def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = {
+      true
+    }
+
+    def getPathCzxid(path: String): Long = {
+      val getDataRequest = GetDataRequest(path)
+      val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+      getDataResponse.stat.getCzxid
+    }
+  }
+
+  private object ExpiredKafkaZkClient {
+    def apply(connectString: String,
+              isSecure: Boolean,
+              sessionTimeoutMs: Int,
+              connectionTimeoutMs: Int,
+              maxInFlightRequests: Int,
+              time: Time,
+              metricGroup: String = "kafka.server",
+              metricType: String = "SessionExpireListener") = {
+      val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
+        time, metricGroup, metricType)
+      new ExpiredKafkaZkClient(zooKeeperClient, isSecure, time)
+    }
+  }
 }