You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/09/07 06:24:09 UTC

[incubator-linkis] branch dev-1.3.0 updated: fix(pes): metadataConnect Rpc call failed (#3253)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new cea55fe22 fix(pes): metadataConnect Rpc call failed (#3253)
cea55fe22 is described below

commit cea55fe22e243319f30ba0f3f4c096bde8b029a6
Author: Jack Xu <xu...@126.com>
AuthorDate: Wed Sep 7 14:24:04 2022 +0800

    fix(pes): metadataConnect Rpc call failed (#3253)
    
    * fix(pes): metadataConnect Rpc call failed
---
 linkis-commons/linkis-protocol/pom.xml             |   1 +
 .../org/apache/linkis/rpc/RPCReceiveRestful.scala  |  21 ++--
 .../apache/linkis/rpc/transform/RPCConsumer.scala  |   4 +
 linkis-commons/linkis-scheduler/pom.xml            |   1 +
 .../engineconn/launch/EngineConnServer.scala       |   2 +-
 .../callback/hook/CallbackEngineConnHook.scala     |   2 +-
 .../linkis-jdbc-driver/pom.xml                     |   2 +-
 linkis-dist/pom.xml                                |   1 -
 .../linkis-datasource-manager/common/pom.xml       |   9 ++
 .../common/protocol/DsmQueryProtocol.scala         |  12 +-
 .../core/receivers/DsmReceiver.scala               | 125 ++++++++-------------
 .../linkis-metadata-query/common/pom.xml           |   2 +-
 .../common/protocol/MetadataOperateProtocol.scala  |   4 +-
 .../query/server/receiver/BaseMetaReceiver.scala   |  62 +++++-----
 14 files changed, 121 insertions(+), 127 deletions(-)

diff --git a/linkis-commons/linkis-protocol/pom.xml b/linkis-commons/linkis-protocol/pom.xml
index 6f36aee03..82187a6a3 100644
--- a/linkis-commons/linkis-protocol/pom.xml
+++ b/linkis-commons/linkis-protocol/pom.xml
@@ -21,6 +21,7 @@
     <groupId>org.apache.linkis</groupId>
     <artifactId>linkis</artifactId>
     <version>1.3.0</version>
+    <relativePath>../../pom.xml</relativePath>
   </parent>
 
   <artifactId>linkis-protocol</artifactId>
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
index ab92151b3..4bc97e8d7 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
@@ -139,7 +139,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
       })
     }
 
-  private implicit def toMessage(obj: Any): Message = obj match {
+  private def toMessage(obj: Any): Message = obj match {
     case Unit | () | null =>
       RPCProduct.getRPCProduct.ok()
     case _: BoxedUnit => RPCProduct.getRPCProduct.ok()
@@ -152,29 +152,36 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
     val obj = RPCConsumer.getRPCConsumer.toObject(message)
     val event = RPCMessageEvent(obj, BaseRPCSender.getInstanceInfo(message.getData))
     rpcReceiverListenerBus.post(event)
+    toMessage(Unit)
   }
 
-  private def receiveAndReply(
+  private def receiveAndReplyWithMessage(
       message: Message,
-      opEvent: (Receiver, Any, Sender) => Message
+      opEvent: (Receiver, Any, Sender) => Any
   ): Message = catchIt {
     val obj = RPCConsumer.getRPCConsumer.toObject(message)
     val serviceInstance = BaseRPCSender.getInstanceInfo(message.getData)
     val event = RPCMessageEvent(obj, serviceInstance)
-    event.map(opEvent(_, obj, event)).getOrElse(RPCProduct.getRPCProduct.notFound())
+    event
+      .map(receiver => {
+        logger.debug("show the receiver {}", receiver.getClass)
+        toMessage(opEvent(receiver, obj, event))
+      })
+      .getOrElse(RPCProduct.getRPCProduct.notFound())
   }
 
   @RequestMapping(path = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
   override def receiveAndReply(@RequestBody message: Message): Message =
-    receiveAndReply(message, _.receiveAndReply(_, _))
+    receiveAndReplyWithMessage(message, _.receiveAndReply(_, _))
 
   @RequestMapping(path = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
   override def receiveAndReplyInMills(@RequestBody message: Message): Message = catchIt {
     val duration = message.getData.get("duration")
-    if (duration == null || StringUtils.isEmpty(duration.toString))
+    if (duration == null || StringUtils.isEmpty(duration.toString)) {
       throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
+    }
     val timeout = Duration(duration.toString.toLong, TimeUnit.MILLISECONDS)
-    receiveAndReply(message, _.receiveAndReply(_, timeout, _))
+    receiveAndReplyWithMessage(message, _.receiveAndReply(_, timeout, _))
   }
 
 }
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
index 4184d1b55..3191da79a 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
@@ -25,6 +25,8 @@ import org.apache.linkis.server.{EXCEPTION_MSG, JMap, Message}
 
 import scala.runtime.BoxedUnit
 
+import org.slf4j.LoggerFactory
+
 private[linkis] trait RPCConsumer {
 
   def toObject(message: Message): Any
@@ -32,6 +34,7 @@ private[linkis] trait RPCConsumer {
 }
 
 private[linkis] object RPCConsumer {
+  private val logger = LoggerFactory.getLogger(getClass)
   import RPCProduct._
 
   private val rpcConsumer: RPCConsumer = new RPCConsumer {
@@ -43,6 +46,7 @@ private[linkis] object RPCConsumer {
           if (data.isEmpty) return BoxedUnit.UNIT
           val objectStr = data.get(OBJECT_VALUE).toString
           val objectClass = data.get(CLASS_VALUE).toString
+          logger.debug("The corresponding anti-sequence is class {}", objectClass)
           val clazz = Utils.tryThrow(Class.forName(objectClass)) {
             case _: ClassNotFoundException =>
               new DWCURIException(
diff --git a/linkis-commons/linkis-scheduler/pom.xml b/linkis-commons/linkis-scheduler/pom.xml
index 73ce5f6a1..9991e24c3 100644
--- a/linkis-commons/linkis-scheduler/pom.xml
+++ b/linkis-commons/linkis-scheduler/pom.xml
@@ -21,6 +21,7 @@
     <groupId>org.apache.linkis</groupId>
     <artifactId>linkis</artifactId>
     <version>1.3.0</version>
+    <relativePath>../../pom.xml</relativePath>
   </parent>
   <artifactId>linkis-scheduler</artifactId>
 
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
index f37d25d5f..88f9112b3 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
@@ -80,7 +80,7 @@ object EngineConnServer extends Logging {
       logger.info("Finished to execute hook of beforeCreateEngineConn.")
       // 2. cresate EngineConn
       val engineConn = getEngineConnManager.createEngineConn(getEngineCreationContext)
-      logger.info(s"Finished to create ${engineConn.getEngineConnType}EngineConn.")
+      logger.info(s"Finished to create ${engineConn.getEngineConnType} EngineConn.")
       EngineConnHook.getEngineConnHooks.foreach(
         _.beforeExecutionExecute(getEngineCreationContext, engineConn)
       )
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index 777f1af44..d7ad2c797 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -47,7 +47,7 @@ class CallbackEngineConnHook extends EngineConnHook with Logging {
     val parser = DWCArgumentsParser.parse(engineCreationContext.getArgs)
     DWCArgumentsParser.setDWCOptionMap(parser.getDWCConfMap)
     val existsExcludePackages = ServerConfiguration.BDP_SERVER_EXCLUDE_PACKAGES.getValue
-    if (!StringUtils.isEmpty(existsExcludePackages)) {
+    if (StringUtils.isNotBlank(existsExcludePackages)) {
       DataWorkCloudApplication.setProperty(
         ServerConfiguration.BDP_SERVER_EXCLUDE_PACKAGES.key,
         existsExcludePackages
diff --git a/linkis-computation-governance/linkis-jdbc-driver/pom.xml b/linkis-computation-governance/linkis-jdbc-driver/pom.xml
index ef2238d2f..f5c89d51f 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/pom.xml
+++ b/linkis-computation-governance/linkis-jdbc-driver/pom.xml
@@ -24,7 +24,7 @@
         <artifactId>linkis</artifactId>
         <groupId>org.apache.linkis</groupId>
         <version>1.3.0</version>
-<!--        <relativePath>../../pom.xml</relativePath>-->
+        <relativePath>../../pom.xml</relativePath>
     </parent>
     <artifactId>linkis-jdbc-driver</artifactId>
 
diff --git a/linkis-dist/pom.xml b/linkis-dist/pom.xml
index 56d9a6460..63e19c9f3 100644
--- a/linkis-dist/pom.xml
+++ b/linkis-dist/pom.xml
@@ -147,7 +147,6 @@
     <dependency>
       <groupId>commons-beanutils</groupId>
       <artifactId>commons-beanutils</artifactId>
-      <version>1.9.4</version>
     </dependency>
 
   </dependencies>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
index e66ec22e5..9e76a59ca 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
@@ -31,6 +31,14 @@
   </properties>
 
   <dependencies>
+    <!--rpc-->
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-protocol</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.linkis</groupId>
       <artifactId>linkis-module</artifactId>
@@ -42,6 +50,7 @@
         </exclusion>
       </exclusions>
     </dependency>
+
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-text</artifactId>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
index f2e04319e..510382d36 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
@@ -17,18 +17,26 @@
 
 package org.apache.linkis.datasourcemanager.common.protocol
 
+import org.apache.linkis.protocol.message.RequestProtocol
+
 import java.util
 
 /**
  * Store error code map
  */
-trait DsmQueryProtocol {}
+trait DsmQueryProtocol extends RequestProtocol {}
 
 /**
  * Query request of Data Source Information
  * @param id
  */
-case class DsInfoQueryRequest(id: String, name: String, system: String) extends DsmQueryProtocol
+case class DsInfoQueryRequest(id: String, name: String, system: String) extends DsmQueryProtocol {
+
+  def isValid: Boolean = {
+    (Option(id).isDefined || Option(name).isDefined) && Option(system).isDefined
+  }
+
+}
 
 /**
  * Response of parameter map
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
index 0612655c4..88aec4a7e 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.datasourcemanager.core.receivers
 
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.datasourcemanager.common.domain.DataSource
 import org.apache.linkis.datasourcemanager.common.protocol.{DsInfoQueryRequest, DsInfoResponse}
 import org.apache.linkis.datasourcemanager.core.restful.RestfulApiHelper
@@ -25,17 +25,19 @@ import org.apache.linkis.datasourcemanager.core.service.{
   DataSourceInfoService,
   DataSourceRelateService
 }
-import org.apache.linkis.rpc.{Receiver, Sender}
+import org.apache.linkis.rpc.message.annotation.Receiver
 
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Component
 
 import java.util
 
-import scala.concurrent.duration.Duration
+import org.slf4j.LoggerFactory
 
 @Component
-class DsmReceiver extends Receiver with Logging {
+class DsmReceiver {
+
+  private val logger = LoggerFactory.getLogger(classOf[DsmReceiver])
 
   @Autowired
   private var dataSourceInfoService: DataSourceInfoService = _
@@ -43,84 +45,51 @@ class DsmReceiver extends Receiver with Logging {
   @Autowired
   private var dataSourceRelateService: DataSourceRelateService = _
 
-  override def receive(message: Any, sender: Sender): Unit = {}
-
-  override def receiveAndReply(message: Any, sender: Sender): Any = message match {
-    case DsInfoQueryRequest(id, name, system) =>
-      if ((Option(id).isDefined || Option(name).isDefined) && Some(system).isDefined) {
-        Utils.tryCatch {
-          var dataSource: DataSource = null
-          if (Option(name).isDefined) {
-            logger.info("Try to get dataSource by dataSourceName:" + name)
-            dataSource = dataSourceInfoService.getDataSourceInfoForConnect(name)
-          } else if (id.toLong > 0) {
-            logger.info("Try to get dataSource by dataSourceId:" + id)
-            dataSource = dataSourceInfoService.getDataSourceInfoForConnect(id.toLong)
-          }
-          if (null != dataSource) {
-            RestfulApiHelper.decryptPasswordKey(
-              dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
-              dataSource.getConnectParams
-            )
-            DsInfoResponse(
-              status = true,
-              dataSource.getDataSourceType.getName,
-              dataSource.getConnectParams,
-              dataSource.getCreateUser
-            )
-          } else {
-            logger.warn("Can not get any dataSource")
-            DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
-          }
-        } {
-          case e: Exception =>
-            logger.error(s"Fail to get data source information, id:$id system:$system", e)
-            DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
-          case t: Throwable =>
-            logger.error(s"Fail to get data source information, id:$id system:$system", t)
-            DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+  @Receiver
+  def dealDsInfoQueryRequest(dsInfoQueryRequest: DsInfoQueryRequest): Any = {
+    if (dsInfoQueryRequest.isValid) {
+      Utils.tryCatch {
+        var dataSource: DataSource = null
+        if (Option(dsInfoQueryRequest.name).isDefined) {
+          logger.info("Try to get dataSource by dataSourceName:" + dsInfoQueryRequest.name)
+          dataSource = dataSourceInfoService.getDataSourceInfoForConnect(dsInfoQueryRequest.name)
+        } else if (dsInfoQueryRequest.id.toLong > 0) {
+          logger.info("Try to get dataSource by dataSourceId:" + dsInfoQueryRequest.id)
+          dataSource =
+            dataSourceInfoService.getDataSourceInfoForConnect(dsInfoQueryRequest.id.toLong)
         }
-      } else {
-        DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
-      }
-    case _ => new Object()
-  }
-
-  override def receiveAndReply(message: Any, duration: Duration, sender: Sender): Any =
-    message match {
-      case DsInfoQueryRequest(id, name, system) =>
-        if ((Option(id).isDefined || Option(name).isDefined) && Some(system).isDefined) {
-          Utils.tryCatch {
-            var dataSource: DataSource = null
-            if (Option(name).isDefined) {
-              dataSource = dataSourceInfoService.getDataSourceInfoForConnect(name)
-            } else if (id.toLong > 0) {
-              dataSource = dataSourceInfoService.getDataSourceInfoForConnect(id.toLong)
-            }
-            if (null != dataSource) {
-              RestfulApiHelper.decryptPasswordKey(
-                dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
-                dataSource.getConnectParams
-              )
-              DsInfoResponse(
-                status = true,
-                dataSource.getDataSourceType.getName,
-                dataSource.getConnectParams,
-                dataSource.getCreateUser
-              )
-            } else DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
-          } {
-            case e: Exception =>
-              logger.error(s"Fail to get data source information, id:$id system:$system", e)
-              DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
-            case t: Throwable =>
-              logger.error(s"Fail to get data source information, id:$id system:$system", t)
-              DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
-          }
+        if (null != dataSource) {
+          RestfulApiHelper.decryptPasswordKey(
+            dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
+            dataSource.getConnectParams
+          )
+          DsInfoResponse(
+            status = true,
+            dataSource.getDataSourceType.getName,
+            dataSource.getConnectParams,
+            dataSource.getCreateUser
+          )
         } else {
+          logger.warn("Can not get any dataSource")
           DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
         }
-      case _ => new Object()
+      } {
+        case e: Exception =>
+          logger.error(
+            s"Fail to get data source information, id:${dsInfoQueryRequest.id} system:${dsInfoQueryRequest.system}",
+            e
+          )
+          DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+        case t: Throwable =>
+          logger.error(
+            s"Fail to get data source information, id:{dsInfoQueryRequest.id} system:${dsInfoQueryRequest.system}",
+            t
+          )
+          DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+      }
+    } else {
+      DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
     }
+  }
 
 }
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
index 47d5c8a07..84a291554 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
@@ -45,7 +45,7 @@
     <!--rpc-->
     <dependency>
       <groupId>org.apache.linkis</groupId>
-      <artifactId>linkis-rpc</artifactId>
+      <artifactId>linkis-protocol</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
index cf27ed477..d0f27b5b1 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
@@ -17,9 +17,11 @@
 
 package org.apache.linkis.metadata.query.common.protocol
 
+import org.apache.linkis.protocol.message.RequestProtocol
+
 import java.util
 
-trait MetadataOperateProtocol {}
+trait MetadataOperateProtocol extends RequestProtocol {}
 
 /**
  * Request to do connect
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
index 46a6be292..c387ad3ad 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
@@ -18,54 +18,48 @@
 package org.apache.linkis.metadata.query.server.receiver
 
 import org.apache.linkis.common.exception.WarnException
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.metadata.query.common.exception.MetaMethodInvokeException
 import org.apache.linkis.metadata.query.common.protocol.{MetadataConnect, MetadataResponse}
 import org.apache.linkis.metadata.query.server.service.MetadataQueryService
-import org.apache.linkis.rpc.{Receiver, Sender}
+import org.apache.linkis.rpc.message.annotation.Receiver
 import org.apache.linkis.server.BDPJettyServerHelper
 
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Component
 
-import scala.concurrent.duration.Duration
+import org.slf4j.LoggerFactory
 
 @Component
-class BaseMetaReceiver extends Receiver with Logging {
+class BaseMetaReceiver {
+
+  private val logger = LoggerFactory.getLogger(classOf[BaseMetaReceiver])
 
   @Autowired
   private var metadataQueryService: MetadataQueryService = _
 
-  override def receive(message: Any, sender: Sender): Unit = {}
-
-  override def receiveAndReply(message: Any, sender: Sender): Any =
-    invoke(metadataQueryService, message)
-
-  override def receiveAndReply(message: Any, duration: Duration, sender: Sender): Any =
-    invoke(metadataQueryService, message)
-
-  def invoke(service: MetadataQueryService, message: Any): Any = Utils.tryCatch {
-    val data = message match {
-      case MetadataConnect(dataSourceType, operator, params, version) =>
-        service.getConnection(dataSourceType, operator, params)
-        // MetadataConnection is not scala class
-        null
-      case _ => new Object()
+  @Receiver
+  def dealMetadataConnectRequest(metadataConnect: MetadataConnect): MetadataResponse =
+    Utils.tryCatch {
+      metadataQueryService.getConnection(
+        metadataConnect.dataSourceType,
+        metadataConnect.operator,
+        metadataConnect.params
+      )
+      MetadataResponse(status = true, BDPJettyServerHelper.gson.toJson(null))
+    } {
+      case e: WarnException =>
+        val errorMsg = e.getMessage
+        logger.trace(s"Fail to invoke meta service: [$errorMsg]")
+        MetadataResponse(status = false, errorMsg)
+      case t: Exception =>
+        t match {
+          case exception: MetaMethodInvokeException =>
+            MetadataResponse(status = false, exception.getCause.getMessage)
+          case _ =>
+            logger.error(s"Fail to invoke meta service", t)
+            MetadataResponse(status = false, t.getMessage)
+        }
     }
-    MetadataResponse(status = true, BDPJettyServerHelper.gson.toJson(data))
-  } {
-    case e: WarnException =>
-      val errorMsg = e.getMessage
-      logger.trace(s"Fail to invoke meta service: [$errorMsg]")
-      MetadataResponse(status = false, errorMsg)
-    case t: Exception =>
-      t match {
-        case exception: MetaMethodInvokeException =>
-          MetadataResponse(status = false, exception.getCause.getMessage)
-        case _ =>
-          logger.error(s"Fail to invoke meta service", t)
-          MetadataResponse(status = false, t.getMessage)
-      }
-  }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org