You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/10 05:04:35 UTC

[kyuubi] branch master updated: [KYUUBI #4145] Change lock and polling seq_num path on service discovery

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 547e5ca61 [KYUUBI #4145] Change lock and polling seq_num path on service discovery
547e5ca61 is described below

commit 547e5ca6176d386f5d8a8b9aadb9c285450f0e37
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Fri Feb 10 13:04:24 2023 +0800

    [KYUUBI #4145] Change lock and polling seq_num path on service discovery
    
    ### _Why are the changes needed?_
    
    This PR proposes to change the paths of distributed lock and seq_num(used for POLLING engine pool select policy) on the Service Discovery component. The reason is that namespace `${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/` should be dedicated to engine registration, we'd better use the separated namespace for other functionalities.
    
    - lock path
    ```
    # before
    ${serverSpace}_${shareLevel}_${engineType}/lock/${user}/${subdomain}
    
    # after
    ${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock/${user}/${subdomain}
    ```
    
    - seq_num
    ```
    # before
    ${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/seq_num/${user}/${poolName}
    
    # after
    ${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum/${user}/${poolName}
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4145 from pan3793/namespace.
    
    Closes #4145
    
    c912b3f66 [Cheng Pan] name
    3326b9b95 [Cheng Pan] name
    10083db0b [Cheng Pan] Change lock and polloing seq_num path on service discovery
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  2 +-
 .../ha/client/etcd/EtcdDiscoveryClient.scala       | 40 +++++++++++-----------
 .../zookeeper/ZookeeperDiscoveryClient.scala       | 40 +++++++++++-----------
 .../scala/org/apache/kyuubi/engine/EngineRef.scala | 10 +++---
 .../org/apache/kyuubi/engine/EngineRefTests.scala  |  2 +-
 5 files changed, 46 insertions(+), 48 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index b46674d06..01bd46bd3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1807,7 +1807,7 @@ object KyuubiConf {
     .intConf
     .createWithDefault(-1)
 
-  val ENGINE_POOL_BALANCE_POLICY: ConfigEntry[String] =
+  val ENGINE_POOL_SELECT_POLICY: ConfigEntry[String] =
     buildConf("kyuubi.engine.pool.selectPolicy")
       .doc("The select policy of an engine from the corresponding engine pool engine for " +
         "a session. <ul>" +
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
index ad3a0550c..80a70f2f2 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
@@ -90,7 +90,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def createClient(): Unit = {
+  override def createClient(): Unit = {
     client = buildClient()
     kvClient = client.getKVClient()
     lockClient = client.getLockClient()
@@ -99,13 +99,13 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     leaseTTL = conf.get(HighAvailabilityConf.HA_ETCD_LEASE_TIMEOUT) / 1000
   }
 
-  def closeClient(): Unit = {
+  override def closeClient(): Unit = {
     if (client != null) {
       client.close()
     }
   }
 
-  def create(path: String, mode: String, createParent: Boolean = true): String = {
+  override def create(path: String, mode: String, createParent: Boolean = true): String = {
     // createParent can not effect here
     mode match {
       case "PERSISTENT" => kvClient.put(
@@ -116,7 +116,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     path
   }
 
-  def getData(path: String): Array[Byte] = {
+  override def getData(path: String): Array[Byte] = {
     val response = kvClient.get(ByteSequence.from(path.getBytes())).get()
     if (response.getKvs.isEmpty) {
       throw new KyuubiException(s"Key[$path] not exists in ETCD, please check it.")
@@ -125,12 +125,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def setData(path: String, data: Array[Byte]): Boolean = {
+  override def setData(path: String, data: Array[Byte]): Boolean = {
     val response = kvClient.put(ByteSequence.from(path.getBytes), ByteSequence.from(data)).get()
     response != null
   }
 
-  def getChildren(path: String): List[String] = {
+  override def getChildren(path: String): List[String] = {
     val kvs = kvClient.get(
       ByteSequence.from(path.getBytes()),
       GetOption.newBuilder().isPrefix(true).build()).get().getKvs
@@ -142,25 +142,25 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def pathExists(path: String): Boolean = {
+  override def pathExists(path: String): Boolean = {
     !pathNonExists(path)
   }
 
-  def pathNonExists(path: String): Boolean = {
+  override def pathNonExists(path: String): Boolean = {
     kvClient.get(ByteSequence.from(path.getBytes())).get().getKvs.isEmpty
   }
 
-  def delete(path: String, deleteChildren: Boolean = false): Unit = {
+  override def delete(path: String, deleteChildren: Boolean = false): Unit = {
     kvClient.delete(
       ByteSequence.from(path.getBytes()),
       DeleteOption.newBuilder().isPrefix(deleteChildren).build()).get()
   }
 
-  def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+  override def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
     // not need with etcd
   }
 
-  def tryWithLock[T](
+  override def tryWithLock[T](
       lockPath: String,
       timeout: Long)(f: => T): T = {
     // the default unit is millis, covert to seconds.
@@ -195,7 +195,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def getServerHost(namespace: String): Option[(String, Int)] = {
+  override def getServerHost(namespace: String): Option[(String, Int)] = {
     // TODO: use last one because to avoid touching some maybe-crashed engines
     // We need a big improvement here.
     getServiceNodesInfo(namespace, Some(1), silent = true) match {
@@ -204,7 +204,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def getEngineByRefId(
+  override def getEngineByRefId(
       namespace: String,
       engineRefId: String): Option[(String, Int)] = {
     getServiceNodesInfo(namespace, silent = true)
@@ -212,7 +212,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
       .map(data => (data.host, data.port))
   }
 
-  def getServiceNodesInfo(
+  override def getServiceNodesInfo(
       namespace: String,
       sizeOpt: Option[Int] = None,
       silent: Boolean = false): Seq[ServiceNodeInfo] = {
@@ -241,7 +241,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def registerService(
+  override def registerService(
       conf: KyuubiConf,
       namespace: String,
       serviceDiscovery: ServiceDiscovery,
@@ -267,7 +267,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def deregisterService(): Unit = {
+  override def deregisterService(): Unit = {
     // close the EPHEMERAL_SEQUENTIAL node in etcd
     if (serviceNode != null) {
       if (serviceNode.lease != LEASE_NULL_VALUE) {
@@ -278,7 +278,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def postDeregisterService(namespace: String): Boolean = {
+  override def postDeregisterService(namespace: String): Boolean = {
     if (namespace != null) {
       delete(DiscoveryPaths.makePath(null, namespace), true)
       true
@@ -287,7 +287,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def createAndGetServiceNode(
+  override def createAndGetServiceNode(
       conf: KyuubiConf,
       namespace: String,
       instance: String,
@@ -297,7 +297,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
   }
 
   @VisibleForTesting
-  def startSecretNode(
+  override def startSecretNode(
       createMode: String,
       basePath: String,
       initData: String,
@@ -307,7 +307,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
       ByteSequence.from(initData.getBytes())).get()
   }
 
-  def getAndIncrement(path: String, delta: Int = 1): Int = {
+  override def getAndIncrement(path: String, delta: Int = 1): Int = {
     val lockPath = s"${path}_tmp_for_lock"
     tryWithLock(lockPath, 60 * 1000) {
       if (pathNonExists(path)) {
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index 1315cf029..daa27047e 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -66,17 +66,17 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
   @volatile private var serviceNode: PersistentNode = _
   private var watcher: DeRegisterWatcher = _
 
-  def createClient(): Unit = {
+  override def createClient(): Unit = {
     zkClient.start()
   }
 
-  def closeClient(): Unit = {
+  override def closeClient(): Unit = {
     if (zkClient != null) {
       zkClient.close()
     }
   }
 
-  def create(path: String, mode: String, createParent: Boolean = true): String = {
+  override def create(path: String, mode: String, createParent: Boolean = true): String = {
     val builder =
       if (createParent) zkClient.create().creatingParentsIfNeeded() else zkClient.create()
     builder
@@ -84,27 +84,27 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
       .forPath(path)
   }
 
-  def getData(path: String): Array[Byte] = {
+  override def getData(path: String): Array[Byte] = {
     zkClient.getData.forPath(path)
   }
 
-  def setData(path: String, data: Array[Byte]): Boolean = {
+  override def setData(path: String, data: Array[Byte]): Boolean = {
     zkClient.setData().forPath(path, data) != null
   }
 
-  def getChildren(path: String): List[String] = {
+  override def getChildren(path: String): List[String] = {
     zkClient.getChildren.forPath(path).asScala.toList
   }
 
-  def pathExists(path: String): Boolean = {
+  override def pathExists(path: String): Boolean = {
     zkClient.checkExists().forPath(path) != null
   }
 
-  def pathNonExists(path: String): Boolean = {
+  override def pathNonExists(path: String): Boolean = {
     zkClient.checkExists().forPath(path) == null
   }
 
-  def delete(path: String, deleteChildren: Boolean = false): Unit = {
+  override def delete(path: String, deleteChildren: Boolean = false): Unit = {
     if (deleteChildren) {
       zkClient.delete().deletingChildrenIfNeeded().forPath(path)
     } else {
@@ -112,7 +112,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+  override def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
     zkClient
       .getConnectionStateListenable.addListener(new ConnectionStateListener {
         private val isConnected = new AtomicBoolean(false)
@@ -141,7 +141,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
       })
   }
 
-  def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
+  override def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
     var lock: InterProcessSemaphoreMutex = null
     try {
       try {
@@ -189,7 +189,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def getServerHost(namespace: String): Option[(String, Int)] = {
+  override def getServerHost(namespace: String): Option[(String, Int)] = {
     // TODO: use last one because to avoid touching some maybe-crashed engines
     // We need a big improvement here.
     getServiceNodesInfo(namespace, Some(1), silent = true) match {
@@ -198,7 +198,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def getEngineByRefId(
+  override def getEngineByRefId(
       namespace: String,
       engineRefId: String): Option[(String, Int)] = {
     getServiceNodesInfo(namespace, silent = true)
@@ -206,7 +206,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
       .map(data => (data.host, data.port))
   }
 
-  def getServiceNodesInfo(
+  override def getServiceNodesInfo(
       namespace: String,
       sizeOpt: Option[Int] = None,
       silent: Boolean = false): Seq[ServiceNodeInfo] = {
@@ -235,7 +235,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def registerService(
+  override def registerService(
       conf: KyuubiConf,
       namespace: String,
       serviceDiscovery: ServiceDiscovery,
@@ -254,7 +254,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     watchNode()
   }
 
-  def deregisterService(): Unit = {
+  override def deregisterService(): Unit = {
     // close the EPHEMERAL_SEQUENTIAL node in zk
     if (serviceNode != null) {
       try {
@@ -268,7 +268,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def postDeregisterService(namespace: String): Boolean = {
+  override def postDeregisterService(namespace: String): Boolean = {
     if (namespace != null) {
       try {
         delete(namespace, true)
@@ -283,7 +283,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     }
   }
 
-  def createAndGetServiceNode(
+  override def createAndGetServiceNode(
       conf: KyuubiConf,
       namespace: String,
       instance: String,
@@ -293,7 +293,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
   }
 
   @VisibleForTesting
-  def startSecretNode(
+  override def startSecretNode(
       createMode: String,
       basePath: String,
       initData: String,
@@ -307,7 +307,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
     secretNode.start()
   }
 
-  def getAndIncrement(path: String, delta: Int = 1): Int = {
+  override def getAndIncrement(path: String, delta: Int = 1): Int = {
     val dai = new DistributedAtomicInteger(zkClient, path, new RetryForever(1000))
     var atomicVal: AtomicValue[Integer] = null
     do {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 565f41ff2..84b7707e8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -74,7 +74,7 @@ private[kyuubi] class EngineRef(
 
   private val enginePoolIgnoreSubdomain: Boolean = conf.get(ENGINE_POOL_IGNORE_SUBDOMAIN)
 
-  private val enginePoolBalancePolicy: String = conf.get(ENGINE_POOL_BALANCE_POLICY)
+  private val enginePoolSelectPolicy: String = conf.get(ENGINE_POOL_SELECT_POLICY)
 
   // In case the multi kyuubi instances have the small gap of timeout, here we add
   // a small amount of time for timeout
@@ -97,12 +97,11 @@ private[kyuubi] class EngineRef(
         warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to " +
           s"system threshold $poolThreshold")
       }
-      val seqNum = enginePoolBalancePolicy match {
+      val seqNum = enginePoolSelectPolicy match {
         case "POLLING" =>
           val snPath =
             DiscoveryPaths.makePath(
-              s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType",
-              "seq_num",
+              s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum",
               appUser,
               clientPoolName)
           DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
@@ -159,8 +158,7 @@ private[kyuubi] class EngineRef(
       case _ =>
         val lockPath =
           DiscoveryPaths.makePath(
-            s"${serverSpace}_${shareLevel}_$engineType",
-            "lock",
+            s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock",
             appUser,
             subdomain)
         discoveryClient.tryWithLock(
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
index 5d8ae3177..5ca8723f5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
@@ -204,7 +204,7 @@ trait EngineRefTests extends KyuubiFunSuite {
     conf.set(ENGINE_POOL_NAME, pool_name)
     conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
     conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
-    conf.set(ENGINE_POOL_BALANCE_POLICY, "POLLING")
+    conf.set(ENGINE_POOL_SELECT_POLICY, "POLLING")
     (0 until (10)).foreach { i =>
       val engine7 = new EngineRef(conf, user, "grp", id, null)
       val engineNumber = Integer.parseInt(engine7.subdomain.substring(pool_name.length + 1))