You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/05/31 12:37:37 UTC

[kyuubi] branch master updated: [KYUUBI #4905] Generalize util method for loading class from service loader

This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf886c967 [KYUUBI #4905] Generalize util method for loading class from service loader
cf886c967 is described below

commit cf886c96765971403afd2f271c9d5a5d9f8620b4
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Wed May 31 20:37:26 2023 +0800

    [KYUUBI #4905] Generalize util method for loading class from service loader
    
    ### _Why are the changes needed?_
    
    - Generalize util method for loading class from service loader in `kyuubi-util-scala` module
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4905 from bowenliang123/service-load-util.
    
    Closes #4905
    
    545183fbf [liangbowen] nit
    8714e0591 [liangbowen] rename loadClassFromServiceLoader to loadFromServiceLoader
    11936419e [liangbowen] nit
    81584e335 [liangbowen] fix loadExtractorsToMap
    1d64b662d [liangbowen] fix
    b7d8895d3 [liangbowen] update
    e15b7d22c [liangbowen] optimize JpsApplicationOperationSuite
    c58ef573c [liangbowen] simplify ConnectionProvider.loadProviders
    31de53df8 [liangbowen] nit
    fca265998 [liangbowen] simplify
    1fada9516 [liangbowen] import
    323b2bd0e [liangbowen] generalize util method for loading class from service loader
    
    Authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: liangbowen <li...@gf.com.cn>
---
 .../kyuubi/plugin/spark/authz/serde/package.scala  | 11 ++----
 .../jdbc/connection/ConnectionProvider.scala       | 29 ++++-----------
 .../kyuubi/engine/jdbc/dialect/JdbcDialect.scala   |  9 ++---
 kyuubi-events/pom.xml                              |  6 ++++
 .../kyuubi/events/handler/EventHandlerLoader.scala | 42 +++++++++-------------
 .../credentials/HadoopCredentialsManager.scala     | 12 +++----
 .../kyuubi/engine/KyuubiApplicationManager.scala   | 10 +++---
 .../engine/JpsApplicationOperationSuite.scala      |  9 +++--
 .../apache/kyuubi/util/reflect/ReflectUtils.scala  | 19 +++++++++-
 9 files changed, 64 insertions(+), 83 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
index a52a558a0..d9f646900 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
@@ -17,9 +17,6 @@
 
 package org.apache.kyuubi.plugin.spark.authz
 
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import com.fasterxml.jackson.core.`type`.TypeReference
@@ -28,16 +25,14 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.kyuubi.plugin.spark.authz.OperationType.{OperationType, QUERY}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 package object serde {
 
   final val mapper = JsonMapper.builder().addModule(DefaultScalaModule).build()
 
-  def loadExtractorsToMap[T <: Extractor](implicit ct: ClassTag[T]): Map[String, T] = {
-    ServiceLoader.load(ct.runtimeClass).iterator().asScala
-      .map { case e: Extractor => (e.key, e.asInstanceOf[T]) }
-      .toMap
-  }
+  def loadExtractorsToMap[T <: Extractor](implicit ct: ClassTag[T]): Map[String, T] =
+    loadFromServiceLoader[T]()(ct).map { e: T => (e.key, e) }.toMap
 
   final lazy val DB_COMMAND_SPECS: Map[String, DatabaseCommandSpec] = {
     val is = getClass.getClassLoader.getResourceAsStream("database_command_spec.json")
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
index 798c92fbe..0dea6a2c1 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
@@ -17,13 +17,11 @@
 package org.apache.kyuubi.engine.jdbc.connection
 
 import java.sql.{Connection, DriverManager}
-import java.util.ServiceLoader
-
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PROVIDER, ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_DRIVER_CLASS}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 abstract class AbstractConnectionProvider extends Logging {
   protected val providers = loadProviders()
@@ -69,27 +67,12 @@ abstract class AbstractConnectionProvider extends Logging {
     selectedProvider.getConnection(kyuubiConf)
   }
 
-  def loadProviders(): Seq[JdbcConnectionProvider] = {
-    val loader = ServiceLoader.load(
-      classOf[JdbcConnectionProvider],
-      Thread.currentThread().getContextClassLoader)
-    val providers = ArrayBuffer[JdbcConnectionProvider]()
-
-    val iterator = loader.iterator()
-    while (iterator.hasNext) {
-      try {
-        val provider = iterator.next()
+  def loadProviders(): Seq[JdbcConnectionProvider] =
+    loadFromServiceLoader[JdbcConnectionProvider]()
+      .map { provider =>
         info(s"Loaded provider: $provider")
-        providers += provider
-      } catch {
-        case t: Throwable =>
-          warn(s"Loaded of the provider failed with the exception", t)
-      }
-    }
-
-    // TODO support disable provider
-    providers
-  }
+        provider
+      }.toSeq
 }
 
 object ConnectionProvider extends AbstractConnectionProvider
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index b7ac7f43b..e08b22758 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -18,9 +18,6 @@ package org.apache.kyuubi.engine.jdbc.dialect
 
 import java.sql.{Connection, Statement}
 import java.util
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
@@ -29,6 +26,7 @@ import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
 import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
 import org.apache.kyuubi.operation.Operation
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 abstract class JdbcDialect extends SupportServiceLoader with Logging {
 
@@ -75,9 +73,8 @@ object JdbcDialects extends Logging {
       assert(url.length > 5 && url.substring(5).contains(":"))
       url.substring(5, url.indexOf(":", 5))
     }
-    val serviceLoader =
-      ServiceLoader.load(classOf[JdbcDialect], Thread.currentThread().getContextClassLoader)
-    serviceLoader.asScala.filter(_.name().equalsIgnoreCase(shortName)).toList match {
+    loadFromServiceLoader[JdbcDialect]()
+      .filter(_.name().equalsIgnoreCase(shortName)).toList match {
       case Nil =>
         throw new KyuubiException(s"Don't find jdbc dialect implement for jdbc engine: $shortName.")
       case head :: Nil =>
diff --git a/kyuubi-events/pom.xml b/kyuubi-events/pom.xml
index 6b51fe015..760955fc6 100644
--- a/kyuubi-events/pom.xml
+++ b/kyuubi-events/pom.xml
@@ -37,6 +37,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
index c81dcfb9b..ea4110455 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
@@ -16,40 +16,30 @@
  */
 package org.apache.kyuubi.events.handler
 
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 object EventHandlerLoader extends Logging {
 
   def loadCustom(kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
-    val providers = ArrayBuffer[CustomEventHandlerProvider]()
-    ServiceLoader.load(
-      classOf[CustomEventHandlerProvider],
-      Utils.getContextOrKyuubiClassLoader)
-      .iterator()
-      .asScala
-      .foreach(provider => providers += provider)
-
-    providers.map { provider =>
-      Try {
-        provider.create(kyuubiConf)
-      } match {
-        case Success(value) =>
-          value
-        case Failure(exception) =>
-          warn(
-            s"Failed to create an EventHandler by the ${provider.getClass.getName}," +
-              s" it will be ignored.",
-            exception)
-          null
-      }
-    }.filter(_ != null)
+    loadFromServiceLoader[CustomEventHandlerProvider](Utils.getContextOrKyuubiClassLoader)
+      .map { provider =>
+        Try {
+          provider.create(kyuubiConf)
+        } match {
+          case Success(value) =>
+            value
+          case Failure(exception) =>
+            warn(
+              s"Failed to create an EventHandler by the ${provider.getClass.getName}," +
+                s" it will be ignored.",
+              exception)
+            null
+        }
+      }.filter(_ != null).toSeq
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index fe710e678..b51255b71 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -17,13 +17,11 @@
 
 package org.apache.kyuubi.credentials
 
-import java.util.ServiceLoader
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{Future, Promise}
 import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success, Try}
 
@@ -35,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 /**
  * [[HadoopCredentialsManager]] manages and renews delegation tokens, which are used by SQL engines
@@ -315,13 +314,10 @@ object HadoopCredentialsManager extends Logging {
   private val providerEnabledConfig = "kyuubi.credentials.%s.enabled"
 
   def loadProviders(kyuubiConf: KyuubiConf): Map[String, HadoopDelegationTokenProvider] = {
-    val loader =
-      ServiceLoader.load(
-        classOf[HadoopDelegationTokenProvider],
-        Utils.getContextOrKyuubiClassLoader)
     val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()
 
-    val iterator = loader.iterator
+    val iterator =
+      loadFromServiceLoader[HadoopDelegationTokenProvider](Utils.getContextOrKyuubiClassLoader)
     while (iterator.hasNext) {
       try {
         providers += iterator.next
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 8c92b77ef..ac7225dd8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -20,9 +20,8 @@ package org.apache.kyuubi.engine
 import java.io.File
 import java.net.{URI, URISyntaxException}
 import java.nio.file.{Files, Path}
-import java.util.{Locale, ServiceLoader}
+import java.util.Locale
 
-import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.kyuubi.{KyuubiException, Utils}
@@ -31,14 +30,13 @@ import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQ
 import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.service.AbstractService
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") {
 
   // TODO: maybe add a configuration is better
-  private val operations = {
-    ServiceLoader.load(classOf[ApplicationOperation], Utils.getContextOrKyuubiClassLoader)
-      .iterator().asScala.toSeq
-  }
+  private val operations =
+    loadFromServiceLoader[ApplicationOperation](Utils.getContextOrKyuubiClassLoader).toSeq
 
   override def initialize(conf: KyuubiConf): Unit = {
     operations.foreach { op =>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index 22e711963..a6e00bbaf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -19,9 +19,8 @@ package org.apache.kyuubi.engine
 
 import java.lang.management.ManagementFactory
 import java.time.Duration
-import java.util.{ServiceLoader, UUID}
+import java.util.UUID
 
-import scala.collection.JavaConverters._
 import scala.sys.process._
 
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -31,11 +30,11 @@ import org.apache.kyuubi.{KyuubiFunSuite, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.SESSION_IDLE_TIMEOUT
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 class JpsApplicationOperationSuite extends KyuubiFunSuite {
-  private val operations = ServiceLoader.load(classOf[ApplicationOperation])
-    .asScala.filter(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation]))
-  private val jps = operations.head
+  private val jps = loadFromServiceLoader[ApplicationOperation]()
+    .find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get
   jps.initialize(null)
 
   test("JpsApplicationOperation with jstat") {
diff --git a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
index 2c46c775b..6e306e371 100644
--- a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
+++ b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kyuubi.util.reflect
 
-import scala.util.{Failure, Success, Try}
+import java.util.ServiceLoader
 
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
 object ReflectUtils {
 
   /**
@@ -60,4 +63,18 @@ object ReflectUtils {
 
   def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T =
     invoke(target, methodName, args: _*).asInstanceOf[T]
+
+  /**
+   * Creates a iterator for with a new service loader for the given service type and class
+   * loader.
+   *
+   * @param cl The class loader to be used to load provider-configuration files
+   *           and provider classes
+   * @param ct class tag of the generics class type
+   * @tparam T the class of the service type
+   * @return
+   */
+  def loadFromServiceLoader[T](cl: ClassLoader = Thread.currentThread().getContextClassLoader)(
+      implicit ct: ClassTag[T]): Iterator[T] =
+    ServiceLoader.load(ct.runtimeClass, cl).iterator().asScala.map(_.asInstanceOf[T])
 }