You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/09/07 06:24:09 UTC
[incubator-linkis] branch dev-1.3.0 updated: fix(pes): metadataConnect Rpc call failed (#3253)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new cea55fe22 fix(pes): metadataConnect Rpc call failed (#3253)
cea55fe22 is described below
commit cea55fe22e243319f30ba0f3f4c096bde8b029a6
Author: Jack Xu <xu...@126.com>
AuthorDate: Wed Sep 7 14:24:04 2022 +0800
fix(pes): metadataConnect Rpc call failed (#3253)
* fix(pes): metadataConnect Rpc call failed
---
linkis-commons/linkis-protocol/pom.xml | 1 +
.../org/apache/linkis/rpc/RPCReceiveRestful.scala | 21 ++--
.../apache/linkis/rpc/transform/RPCConsumer.scala | 4 +
linkis-commons/linkis-scheduler/pom.xml | 1 +
.../engineconn/launch/EngineConnServer.scala | 2 +-
.../callback/hook/CallbackEngineConnHook.scala | 2 +-
.../linkis-jdbc-driver/pom.xml | 2 +-
linkis-dist/pom.xml | 1 -
.../linkis-datasource-manager/common/pom.xml | 9 ++
.../common/protocol/DsmQueryProtocol.scala | 12 +-
.../core/receivers/DsmReceiver.scala | 125 ++++++++-------------
.../linkis-metadata-query/common/pom.xml | 2 +-
.../common/protocol/MetadataOperateProtocol.scala | 4 +-
.../query/server/receiver/BaseMetaReceiver.scala | 62 +++++-----
14 files changed, 121 insertions(+), 127 deletions(-)
diff --git a/linkis-commons/linkis-protocol/pom.xml b/linkis-commons/linkis-protocol/pom.xml
index 6f36aee03..82187a6a3 100644
--- a/linkis-commons/linkis-protocol/pom.xml
+++ b/linkis-commons/linkis-protocol/pom.xml
@@ -21,6 +21,7 @@
<groupId>org.apache.linkis</groupId>
<artifactId>linkis</artifactId>
<version>1.3.0</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>linkis-protocol</artifactId>
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
index ab92151b3..4bc97e8d7 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
@@ -139,7 +139,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
})
}
- private implicit def toMessage(obj: Any): Message = obj match {
+ private def toMessage(obj: Any): Message = obj match {
case Unit | () | null =>
RPCProduct.getRPCProduct.ok()
case _: BoxedUnit => RPCProduct.getRPCProduct.ok()
@@ -152,29 +152,36 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
val obj = RPCConsumer.getRPCConsumer.toObject(message)
val event = RPCMessageEvent(obj, BaseRPCSender.getInstanceInfo(message.getData))
rpcReceiverListenerBus.post(event)
+ toMessage(Unit)
}
- private def receiveAndReply(
+ private def receiveAndReplyWithMessage(
message: Message,
- opEvent: (Receiver, Any, Sender) => Message
+ opEvent: (Receiver, Any, Sender) => Any
): Message = catchIt {
val obj = RPCConsumer.getRPCConsumer.toObject(message)
val serviceInstance = BaseRPCSender.getInstanceInfo(message.getData)
val event = RPCMessageEvent(obj, serviceInstance)
- event.map(opEvent(_, obj, event)).getOrElse(RPCProduct.getRPCProduct.notFound())
+ event
+ .map(receiver => {
+ logger.debug("show the receiver {}", receiver.getClass)
+ toMessage(opEvent(receiver, obj, event))
+ })
+ .getOrElse(RPCProduct.getRPCProduct.notFound())
}
@RequestMapping(path = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
override def receiveAndReply(@RequestBody message: Message): Message =
- receiveAndReply(message, _.receiveAndReply(_, _))
+ receiveAndReplyWithMessage(message, _.receiveAndReply(_, _))
@RequestMapping(path = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
override def receiveAndReplyInMills(@RequestBody message: Message): Message = catchIt {
val duration = message.getData.get("duration")
- if (duration == null || StringUtils.isEmpty(duration.toString))
+ if (duration == null || StringUtils.isEmpty(duration.toString)) {
throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
+ }
val timeout = Duration(duration.toString.toLong, TimeUnit.MILLISECONDS)
- receiveAndReply(message, _.receiveAndReply(_, timeout, _))
+ receiveAndReplyWithMessage(message, _.receiveAndReply(_, timeout, _))
}
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
index 4184d1b55..3191da79a 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
@@ -25,6 +25,8 @@ import org.apache.linkis.server.{EXCEPTION_MSG, JMap, Message}
import scala.runtime.BoxedUnit
+import org.slf4j.LoggerFactory
+
private[linkis] trait RPCConsumer {
def toObject(message: Message): Any
@@ -32,6 +34,7 @@ private[linkis] trait RPCConsumer {
}
private[linkis] object RPCConsumer {
+ private val logger = LoggerFactory.getLogger(getClass)
import RPCProduct._
private val rpcConsumer: RPCConsumer = new RPCConsumer {
@@ -43,6 +46,7 @@ private[linkis] object RPCConsumer {
if (data.isEmpty) return BoxedUnit.UNIT
val objectStr = data.get(OBJECT_VALUE).toString
val objectClass = data.get(CLASS_VALUE).toString
+ logger.debug("The corresponding anti-sequence is class {}", objectClass)
val clazz = Utils.tryThrow(Class.forName(objectClass)) {
case _: ClassNotFoundException =>
new DWCURIException(
diff --git a/linkis-commons/linkis-scheduler/pom.xml b/linkis-commons/linkis-scheduler/pom.xml
index 73ce5f6a1..9991e24c3 100644
--- a/linkis-commons/linkis-scheduler/pom.xml
+++ b/linkis-commons/linkis-scheduler/pom.xml
@@ -21,6 +21,7 @@
<groupId>org.apache.linkis</groupId>
<artifactId>linkis</artifactId>
<version>1.3.0</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>linkis-scheduler</artifactId>
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
index f37d25d5f..88f9112b3 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
@@ -80,7 +80,7 @@ object EngineConnServer extends Logging {
logger.info("Finished to execute hook of beforeCreateEngineConn.")
// 2. cresate EngineConn
val engineConn = getEngineConnManager.createEngineConn(getEngineCreationContext)
- logger.info(s"Finished to create ${engineConn.getEngineConnType}EngineConn.")
+ logger.info(s"Finished to create ${engineConn.getEngineConnType} EngineConn.")
EngineConnHook.getEngineConnHooks.foreach(
_.beforeExecutionExecute(getEngineCreationContext, engineConn)
)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index 777f1af44..d7ad2c797 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -47,7 +47,7 @@ class CallbackEngineConnHook extends EngineConnHook with Logging {
val parser = DWCArgumentsParser.parse(engineCreationContext.getArgs)
DWCArgumentsParser.setDWCOptionMap(parser.getDWCConfMap)
val existsExcludePackages = ServerConfiguration.BDP_SERVER_EXCLUDE_PACKAGES.getValue
- if (!StringUtils.isEmpty(existsExcludePackages)) {
+ if (StringUtils.isNotBlank(existsExcludePackages)) {
DataWorkCloudApplication.setProperty(
ServerConfiguration.BDP_SERVER_EXCLUDE_PACKAGES.key,
existsExcludePackages
diff --git a/linkis-computation-governance/linkis-jdbc-driver/pom.xml b/linkis-computation-governance/linkis-jdbc-driver/pom.xml
index ef2238d2f..f5c89d51f 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/pom.xml
+++ b/linkis-computation-governance/linkis-jdbc-driver/pom.xml
@@ -24,7 +24,7 @@
<artifactId>linkis</artifactId>
<groupId>org.apache.linkis</groupId>
<version>1.3.0</version>
-<!-- <relativePath>../../pom.xml</relativePath>-->
+ <relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>linkis-jdbc-driver</artifactId>
diff --git a/linkis-dist/pom.xml b/linkis-dist/pom.xml
index 56d9a6460..63e19c9f3 100644
--- a/linkis-dist/pom.xml
+++ b/linkis-dist/pom.xml
@@ -147,7 +147,6 @@
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
- <version>1.9.4</version>
</dependency>
</dependencies>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
index e66ec22e5..9e76a59ca 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/pom.xml
@@ -31,6 +31,14 @@
</properties>
<dependencies>
+ <!--rpc-->
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-protocol</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-module</artifactId>
@@ -42,6 +50,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
index f2e04319e..510382d36 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/common/src/main/scala/org/apache/linkis/datasourcemanager/common/protocol/DsmQueryProtocol.scala
@@ -17,18 +17,26 @@
package org.apache.linkis.datasourcemanager.common.protocol
+import org.apache.linkis.protocol.message.RequestProtocol
+
import java.util
/**
* Store error code map
*/
-trait DsmQueryProtocol {}
+trait DsmQueryProtocol extends RequestProtocol {}
/**
* Query request of Data Source Information
* @param id
*/
-case class DsInfoQueryRequest(id: String, name: String, system: String) extends DsmQueryProtocol
+case class DsInfoQueryRequest(id: String, name: String, system: String) extends DsmQueryProtocol {
+
+ def isValid: Boolean = {
+ (Option(id).isDefined || Option(name).isDefined) && Option(system).isDefined
+ }
+
+}
/**
* Response of parameter map
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
index 0612655c4..88aec4a7e 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/server/src/main/scala/org/apache/linkis/datasourcemanager/core/receivers/DsmReceiver.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.datasourcemanager.core.receivers
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Utils
import org.apache.linkis.datasourcemanager.common.domain.DataSource
import org.apache.linkis.datasourcemanager.common.protocol.{DsInfoQueryRequest, DsInfoResponse}
import org.apache.linkis.datasourcemanager.core.restful.RestfulApiHelper
@@ -25,17 +25,19 @@ import org.apache.linkis.datasourcemanager.core.service.{
DataSourceInfoService,
DataSourceRelateService
}
-import org.apache.linkis.rpc.{Receiver, Sender}
+import org.apache.linkis.rpc.message.annotation.Receiver
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import java.util
-import scala.concurrent.duration.Duration
+import org.slf4j.LoggerFactory
@Component
-class DsmReceiver extends Receiver with Logging {
+class DsmReceiver {
+
+ private val logger = LoggerFactory.getLogger(classOf[DsmReceiver])
@Autowired
private var dataSourceInfoService: DataSourceInfoService = _
@@ -43,84 +45,51 @@ class DsmReceiver extends Receiver with Logging {
@Autowired
private var dataSourceRelateService: DataSourceRelateService = _
- override def receive(message: Any, sender: Sender): Unit = {}
-
- override def receiveAndReply(message: Any, sender: Sender): Any = message match {
- case DsInfoQueryRequest(id, name, system) =>
- if ((Option(id).isDefined || Option(name).isDefined) && Some(system).isDefined) {
- Utils.tryCatch {
- var dataSource: DataSource = null
- if (Option(name).isDefined) {
- logger.info("Try to get dataSource by dataSourceName:" + name)
- dataSource = dataSourceInfoService.getDataSourceInfoForConnect(name)
- } else if (id.toLong > 0) {
- logger.info("Try to get dataSource by dataSourceId:" + id)
- dataSource = dataSourceInfoService.getDataSourceInfoForConnect(id.toLong)
- }
- if (null != dataSource) {
- RestfulApiHelper.decryptPasswordKey(
- dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
- dataSource.getConnectParams
- )
- DsInfoResponse(
- status = true,
- dataSource.getDataSourceType.getName,
- dataSource.getConnectParams,
- dataSource.getCreateUser
- )
- } else {
- logger.warn("Can not get any dataSource")
- DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
- }
- } {
- case e: Exception =>
- logger.error(s"Fail to get data source information, id:$id system:$system", e)
- DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
- case t: Throwable =>
- logger.error(s"Fail to get data source information, id:$id system:$system", t)
- DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+ @Receiver
+ def dealDsInfoQueryRequest(dsInfoQueryRequest: DsInfoQueryRequest): Any = {
+ if (dsInfoQueryRequest.isValid) {
+ Utils.tryCatch {
+ var dataSource: DataSource = null
+ if (Option(dsInfoQueryRequest.name).isDefined) {
+ logger.info("Try to get dataSource by dataSourceName:" + dsInfoQueryRequest.name)
+ dataSource = dataSourceInfoService.getDataSourceInfoForConnect(dsInfoQueryRequest.name)
+ } else if (dsInfoQueryRequest.id.toLong > 0) {
+ logger.info("Try to get dataSource by dataSourceId:" + dsInfoQueryRequest.id)
+ dataSource =
+ dataSourceInfoService.getDataSourceInfoForConnect(dsInfoQueryRequest.id.toLong)
}
- } else {
- DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
- }
- case _ => new Object()
- }
-
- override def receiveAndReply(message: Any, duration: Duration, sender: Sender): Any =
- message match {
- case DsInfoQueryRequest(id, name, system) =>
- if ((Option(id).isDefined || Option(name).isDefined) && Some(system).isDefined) {
- Utils.tryCatch {
- var dataSource: DataSource = null
- if (Option(name).isDefined) {
- dataSource = dataSourceInfoService.getDataSourceInfoForConnect(name)
- } else if (id.toLong > 0) {
- dataSource = dataSourceInfoService.getDataSourceInfoForConnect(id.toLong)
- }
- if (null != dataSource) {
- RestfulApiHelper.decryptPasswordKey(
- dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
- dataSource.getConnectParams
- )
- DsInfoResponse(
- status = true,
- dataSource.getDataSourceType.getName,
- dataSource.getConnectParams,
- dataSource.getCreateUser
- )
- } else DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
- } {
- case e: Exception =>
- logger.error(s"Fail to get data source information, id:$id system:$system", e)
- DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
- case t: Throwable =>
- logger.error(s"Fail to get data source information, id:$id system:$system", t)
- DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
- }
+ if (null != dataSource) {
+ RestfulApiHelper.decryptPasswordKey(
+ dataSourceRelateService.getKeyDefinitionsByType(dataSource.getDataSourceTypeId),
+ dataSource.getConnectParams
+ )
+ DsInfoResponse(
+ status = true,
+ dataSource.getDataSourceType.getName,
+ dataSource.getConnectParams,
+ dataSource.getCreateUser
+ )
} else {
+ logger.warn("Can not get any dataSource")
DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
}
- case _ => new Object()
+ } {
+ case e: Exception =>
+ logger.error(
+ s"Fail to get data source information, id:${dsInfoQueryRequest.id} system:${dsInfoQueryRequest.system}",
+ e
+ )
+ DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+ case t: Throwable =>
+ logger.error(
+ s"Fail to get data source information, id:{dsInfoQueryRequest.id} system:${dsInfoQueryRequest.system}",
+ t
+ )
+ DsInfoResponse(status = false, "", new util.HashMap[String, Object](), "")
+ }
+ } else {
+ DsInfoResponse(status = true, "", new util.HashMap[String, Object](), "")
}
+ }
}
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
index 47d5c8a07..84a291554 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/pom.xml
@@ -45,7 +45,7 @@
<!--rpc-->
<dependency>
<groupId>org.apache.linkis</groupId>
- <artifactId>linkis-rpc</artifactId>
+ <artifactId>linkis-protocol</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
index cf27ed477..d0f27b5b1 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/scala/org/apache/linkis/metadata/query/common/protocol/MetadataOperateProtocol.scala
@@ -17,9 +17,11 @@
package org.apache.linkis.metadata.query.common.protocol
+import org.apache.linkis.protocol.message.RequestProtocol
+
import java.util
-trait MetadataOperateProtocol {}
+trait MetadataOperateProtocol extends RequestProtocol {}
/**
* Request to do connect
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
index 46a6be292..c387ad3ad 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/scala/org/apache/linkis/metadata/query/server/receiver/BaseMetaReceiver.scala
@@ -18,54 +18,48 @@
package org.apache.linkis.metadata.query.server.receiver
import org.apache.linkis.common.exception.WarnException
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Utils
import org.apache.linkis.metadata.query.common.exception.MetaMethodInvokeException
import org.apache.linkis.metadata.query.common.protocol.{MetadataConnect, MetadataResponse}
import org.apache.linkis.metadata.query.server.service.MetadataQueryService
-import org.apache.linkis.rpc.{Receiver, Sender}
+import org.apache.linkis.rpc.message.annotation.Receiver
import org.apache.linkis.server.BDPJettyServerHelper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
-import scala.concurrent.duration.Duration
+import org.slf4j.LoggerFactory
@Component
-class BaseMetaReceiver extends Receiver with Logging {
+class BaseMetaReceiver {
+
+ private val logger = LoggerFactory.getLogger(classOf[BaseMetaReceiver])
@Autowired
private var metadataQueryService: MetadataQueryService = _
- override def receive(message: Any, sender: Sender): Unit = {}
-
- override def receiveAndReply(message: Any, sender: Sender): Any =
- invoke(metadataQueryService, message)
-
- override def receiveAndReply(message: Any, duration: Duration, sender: Sender): Any =
- invoke(metadataQueryService, message)
-
- def invoke(service: MetadataQueryService, message: Any): Any = Utils.tryCatch {
- val data = message match {
- case MetadataConnect(dataSourceType, operator, params, version) =>
- service.getConnection(dataSourceType, operator, params)
- // MetadataConnection is not scala class
- null
- case _ => new Object()
+ @Receiver
+ def dealMetadataConnectRequest(metadataConnect: MetadataConnect): MetadataResponse =
+ Utils.tryCatch {
+ metadataQueryService.getConnection(
+ metadataConnect.dataSourceType,
+ metadataConnect.operator,
+ metadataConnect.params
+ )
+ MetadataResponse(status = true, BDPJettyServerHelper.gson.toJson(null))
+ } {
+ case e: WarnException =>
+ val errorMsg = e.getMessage
+ logger.trace(s"Fail to invoke meta service: [$errorMsg]")
+ MetadataResponse(status = false, errorMsg)
+ case t: Exception =>
+ t match {
+ case exception: MetaMethodInvokeException =>
+ MetadataResponse(status = false, exception.getCause.getMessage)
+ case _ =>
+ logger.error(s"Fail to invoke meta service", t)
+ MetadataResponse(status = false, t.getMessage)
+ }
}
- MetadataResponse(status = true, BDPJettyServerHelper.gson.toJson(data))
- } {
- case e: WarnException =>
- val errorMsg = e.getMessage
- logger.trace(s"Fail to invoke meta service: [$errorMsg]")
- MetadataResponse(status = false, errorMsg)
- case t: Exception =>
- t match {
- case exception: MetaMethodInvokeException =>
- MetadataResponse(status = false, exception.getCause.getMessage)
- case _ =>
- logger.error(s"Fail to invoke meta service", t)
- MetadataResponse(status = false, t.getMessage)
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org