You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/05/31 12:37:37 UTC
[kyuubi] branch master updated: [KYUUBI #4905] Generalize util method for loading class from service loader
This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new cf886c967 [KYUUBI #4905] Generalize util method for loading class from service loader
cf886c967 is described below
commit cf886c96765971403afd2f271c9d5a5d9f8620b4
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Wed May 31 20:37:26 2023 +0800
[KYUUBI #4905] Generalize util method for loading class from service loader
### _Why are the changes needed?_
- Generalize util method for loading class from service loader in `kyuubi-util-scala` module
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4905 from bowenliang123/service-load-util.
Closes #4905
545183fbf [liangbowen] nit
8714e0591 [liangbowen] rename loadClassFromServiceLoader to loadFromServiceLoader
11936419e [liangbowen] nit
81584e335 [liangbowen] fix loadExtractorsToMap
1d64b662d [liangbowen] fix
b7d8895d3 [liangbowen] update
e15b7d22c [liangbowen] optimize JpsApplicationOperationSuite
c58ef573c [liangbowen] simplify ConnectionProvider.loadProviders
31de53df8 [liangbowen] nit
fca265998 [liangbowen] simplify
1fada9516 [liangbowen] import
323b2bd0e [liangbowen] generalize util method for loading class from service loader
Authored-by: liangbowen <li...@gf.com.cn>
Signed-off-by: liangbowen <li...@gf.com.cn>
---
.../kyuubi/plugin/spark/authz/serde/package.scala | 11 ++----
.../jdbc/connection/ConnectionProvider.scala | 29 ++++-----------
.../kyuubi/engine/jdbc/dialect/JdbcDialect.scala | 9 ++---
kyuubi-events/pom.xml | 6 ++++
.../kyuubi/events/handler/EventHandlerLoader.scala | 42 +++++++++-------------
.../credentials/HadoopCredentialsManager.scala | 12 +++----
.../kyuubi/engine/KyuubiApplicationManager.scala | 10 +++---
.../engine/JpsApplicationOperationSuite.scala | 9 +++--
.../apache/kyuubi/util/reflect/ReflectUtils.scala | 19 +++++++++-
9 files changed, 64 insertions(+), 83 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
index a52a558a0..d9f646900 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
@@ -17,9 +17,6 @@
package org.apache.kyuubi.plugin.spark.authz
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.fasterxml.jackson.core.`type`.TypeReference
@@ -28,16 +25,14 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.kyuubi.plugin.spark.authz.OperationType.{OperationType, QUERY}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
package object serde {
final val mapper = JsonMapper.builder().addModule(DefaultScalaModule).build()
- def loadExtractorsToMap[T <: Extractor](implicit ct: ClassTag[T]): Map[String, T] = {
- ServiceLoader.load(ct.runtimeClass).iterator().asScala
- .map { case e: Extractor => (e.key, e.asInstanceOf[T]) }
- .toMap
- }
+ def loadExtractorsToMap[T <: Extractor](implicit ct: ClassTag[T]): Map[String, T] =
+ loadFromServiceLoader[T]()(ct).map { e: T => (e.key, e) }.toMap
final lazy val DB_COMMAND_SPECS: Map[String, DatabaseCommandSpec] = {
val is = getClass.getClassLoader.getResourceAsStream("database_command_spec.json")
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
index 798c92fbe..0dea6a2c1 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
@@ -17,13 +17,11 @@
package org.apache.kyuubi.engine.jdbc.connection
import java.sql.{Connection, DriverManager}
-import java.util.ServiceLoader
-
-import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PROVIDER, ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_DRIVER_CLASS}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class AbstractConnectionProvider extends Logging {
protected val providers = loadProviders()
@@ -69,27 +67,12 @@ abstract class AbstractConnectionProvider extends Logging {
selectedProvider.getConnection(kyuubiConf)
}
- def loadProviders(): Seq[JdbcConnectionProvider] = {
- val loader = ServiceLoader.load(
- classOf[JdbcConnectionProvider],
- Thread.currentThread().getContextClassLoader)
- val providers = ArrayBuffer[JdbcConnectionProvider]()
-
- val iterator = loader.iterator()
- while (iterator.hasNext) {
- try {
- val provider = iterator.next()
+ def loadProviders(): Seq[JdbcConnectionProvider] =
+ loadFromServiceLoader[JdbcConnectionProvider]()
+ .map { provider =>
info(s"Loaded provider: $provider")
- providers += provider
- } catch {
- case t: Throwable =>
- warn(s"Loaded of the provider failed with the exception", t)
- }
- }
-
- // TODO support disable provider
- providers
- }
+ provider
+ }.toSeq
}
object ConnectionProvider extends AbstractConnectionProvider
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index b7ac7f43b..e08b22758 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -18,9 +18,6 @@ package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, Statement}
import java.util
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
@@ -29,6 +26,7 @@ import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class JdbcDialect extends SupportServiceLoader with Logging {
@@ -75,9 +73,8 @@ object JdbcDialects extends Logging {
assert(url.length > 5 && url.substring(5).contains(":"))
url.substring(5, url.indexOf(":", 5))
}
- val serviceLoader =
- ServiceLoader.load(classOf[JdbcDialect], Thread.currentThread().getContextClassLoader)
- serviceLoader.asScala.filter(_.name().equalsIgnoreCase(shortName)).toList match {
+ loadFromServiceLoader[JdbcDialect]()
+ .filter(_.name().equalsIgnoreCase(shortName)).toList match {
case Nil =>
throw new KyuubiException(s"Don't find jdbc dialect implement for jdbc engine: $shortName.")
case head :: Nil =>
diff --git a/kyuubi-events/pom.xml b/kyuubi-events/pom.xml
index 6b51fe015..760955fc6 100644
--- a/kyuubi-events/pom.xml
+++ b/kyuubi-events/pom.xml
@@ -37,6 +37,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
index c81dcfb9b..ea4110455 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/EventHandlerLoader.scala
@@ -16,40 +16,30 @@
*/
package org.apache.kyuubi.events.handler
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.util.reflect.ReflectUtils._
object EventHandlerLoader extends Logging {
def loadCustom(kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
- val providers = ArrayBuffer[CustomEventHandlerProvider]()
- ServiceLoader.load(
- classOf[CustomEventHandlerProvider],
- Utils.getContextOrKyuubiClassLoader)
- .iterator()
- .asScala
- .foreach(provider => providers += provider)
-
- providers.map { provider =>
- Try {
- provider.create(kyuubiConf)
- } match {
- case Success(value) =>
- value
- case Failure(exception) =>
- warn(
- s"Failed to create an EventHandler by the ${provider.getClass.getName}," +
- s" it will be ignored.",
- exception)
- null
- }
- }.filter(_ != null)
+ loadFromServiceLoader[CustomEventHandlerProvider](Utils.getContextOrKyuubiClassLoader)
+ .map { provider =>
+ Try {
+ provider.create(kyuubiConf)
+ } match {
+ case Success(value) =>
+ value
+ case Failure(exception) =>
+ warn(
+ s"Failed to create an EventHandler by the ${provider.getClass.getName}," +
+ s" it will be ignored.",
+ exception)
+ null
+ }
+ }.filter(_ != null).toSeq
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index fe710e678..b51255b71 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -17,13 +17,11 @@
package org.apache.kyuubi.credentials
-import java.util.ServiceLoader
import java.util.concurrent._
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
@@ -35,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
/**
* [[HadoopCredentialsManager]] manages and renews delegation tokens, which are used by SQL engines
@@ -315,13 +314,10 @@ object HadoopCredentialsManager extends Logging {
private val providerEnabledConfig = "kyuubi.credentials.%s.enabled"
def loadProviders(kyuubiConf: KyuubiConf): Map[String, HadoopDelegationTokenProvider] = {
- val loader =
- ServiceLoader.load(
- classOf[HadoopDelegationTokenProvider],
- Utils.getContextOrKyuubiClassLoader)
val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()
- val iterator = loader.iterator
+ val iterator =
+ loadFromServiceLoader[HadoopDelegationTokenProvider](Utils.getContextOrKyuubiClassLoader)
while (iterator.hasNext) {
try {
providers += iterator.next
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 8c92b77ef..ac7225dd8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -20,9 +20,8 @@ package org.apache.kyuubi.engine
import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.{Files, Path}
-import java.util.{Locale, ServiceLoader}
+import java.util.Locale
-import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.kyuubi.{KyuubiException, Utils}
@@ -31,14 +30,13 @@ import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQ
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.service.AbstractService
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") {
// TODO: maybe add a configuration is better
- private val operations = {
- ServiceLoader.load(classOf[ApplicationOperation], Utils.getContextOrKyuubiClassLoader)
- .iterator().asScala.toSeq
- }
+ private val operations =
+ loadFromServiceLoader[ApplicationOperation](Utils.getContextOrKyuubiClassLoader).toSeq
override def initialize(conf: KyuubiConf): Unit = {
operations.foreach { op =>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index 22e711963..a6e00bbaf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -19,9 +19,8 @@ package org.apache.kyuubi.engine
import java.lang.management.ManagementFactory
import java.time.Duration
-import java.util.{ServiceLoader, UUID}
+import java.util.UUID
-import scala.collection.JavaConverters._
import scala.sys.process._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -31,11 +30,11 @@ import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SESSION_IDLE_TIMEOUT
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class JpsApplicationOperationSuite extends KyuubiFunSuite {
- private val operations = ServiceLoader.load(classOf[ApplicationOperation])
- .asScala.filter(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation]))
- private val jps = operations.head
+ private val jps = loadFromServiceLoader[ApplicationOperation]()
+ .find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get
jps.initialize(null)
test("JpsApplicationOperation with jstat") {
diff --git a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
index 2c46c775b..6e306e371 100644
--- a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
+++ b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
@@ -17,8 +17,11 @@
package org.apache.kyuubi.util.reflect
-import scala.util.{Failure, Success, Try}
+import java.util.ServiceLoader
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
object ReflectUtils {
/**
@@ -60,4 +63,18 @@ object ReflectUtils {
def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T =
invoke(target, methodName, args: _*).asInstanceOf[T]
+
+ /**
+ * Creates a iterator for with a new service loader for the given service type and class
+ * loader.
+ *
+ * @param cl The class loader to be used to load provider-configuration files
+ * and provider classes
+ * @param ct class tag of the generics class type
+ * @tparam T the class of the service type
+ * @return
+ */
+ def loadFromServiceLoader[T](cl: ClassLoader = Thread.currentThread().getContextClassLoader)(
+ implicit ct: ClassTag[T]): Iterator[T] =
+ ServiceLoader.load(ct.runtimeClass, cl).iterator().asScala.map(_.asInstanceOf[T])
}