You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/05/31 03:33:17 UTC
[incubator-linkis] branch dev-1.1.3 updated: LinkisManager record table records ec information (#2151)
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
new 6ef9fc9c1 LinkisManager record table records ec information (#2151)
6ef9fc9c1 is described below
commit 6ef9fc9c1189ee59584ea317c1369e2e77249647
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue May 31 11:33:12 2022 +0800
LinkisManager record table records ec information (#2151)
* add ec info record feature
* optimize ec log from tail to read
* add ec resource info action
* use spotless reformat code
* optimize code
* optimize code
* add watermark switch
* update ec info ddl
* update get page size
* remove reset creator
* optimize read log operator support from tail read
* modify ddl
---
db/linkis_ddl.sql | 36 ++--
.../client/once/action/ECResourceInfoAction.scala | 53 +++++
.../client/once/result/ECResourceInfoResult.scala | 38 ++++
.../governance/common/utils/ECPathUtils.scala | 37 ++++
.../server/operator/EngineConnLogOperator.scala | 85 +++++---
.../impl/DefaultLocalDirsHandleService.scala | 17 +-
.../interceptor/impl/CSEntranceInterceptor.scala | 2 +-
.../am/restful/ECResourceInfoRestfulApi.java | 70 +++++++
.../linkis/manager/am/restful/EMRestfulApi.java | 3 +
.../manager/am/service/ECResourceInfoService.java | 32 +++
.../am/service/impl/ECResourceInfoServiceImpl.java | 61 ++++++
.../entity/persistence/ECResourceInfoRecord.java | 216 +++++++++++++++++++++
.../PersistenceResourceActionRecord.java | 155 ---------------
.../linkis/manager/dao/ECResourceRecordMapper.java | 37 ++++
.../linkis/manager/dao/ResourceManagerMapper.java | 10 -
.../manager/dao/impl/ECResourceRecordMapper.xml | 63 ++++++
.../manager/dao/impl/ResourceManagerMapper.xml | 19 --
.../persistence/ResourceManagerPersistence.java | 7 -
.../impl/DefaultResourceManagerPersistence.java | 16 --
.../service/impl/DefaultResourceManager.scala | 81 ++++----
.../service/impl/ResourceLogService.scala | 194 ++++++++----------
.../linkis/resourcemanager/utils/RMUtils.scala | 2 +-
.../gateway/config/GatewayConfiguration.scala | 2 +
.../linkis/gateway/security/UserRestful.scala | 2 +-
scalastyle-config.xml | 2 +-
25 files changed, 818 insertions(+), 422 deletions(-)
diff --git a/db/linkis_ddl.sql b/db/linkis_ddl.sql
index c798d3e74..80a52f7af 100644
--- a/db/linkis_ddl.sql
+++ b/db/linkis_ddl.sql
@@ -730,21 +730,27 @@ CREATE TABLE `linkis_cg_manager_label_resource` (
UNIQUE KEY `label_id` (`label_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-DROP TABLE IF EXISTS `linkis_cg_rm_resource_action_record`;
-CREATE TABLE `linkis_cg_rm_resource_action_record` (
- `id` INT(20) NOT NULL AUTO_INCREMENT,
- `label_value` VARCHAR(100) NOT NULL,
- `ticket_id` VARCHAR(100) NOT NULL,
- `request_times` INT(8),
- `request_resource_all` VARCHAR(100),
- `used_times` INT(8),
- `used_resource_all` VARCHAR(100),
- `release_times` INT(8),
- `release_resource_all` VARCHAR(100),
- `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
- `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY (`id`),
- UNIQUE KEY `label_value_ticket_id` (`label_value`, `ticket_id`)
+DROP TABLE IF EXISTS `linkis_cg_ec_resource_info_record`;
+CREATE TABLE `linkis_cg_ec_resource_info_record` (
+ `id` INT(20) NOT NULL AUTO_INCREMENT,
+ `label_value` VARCHAR(255) NOT NULL COMMENT 'ec labels stringValue',
+ `create_user` VARCHAR(128) NOT NULL COMMENT 'ec create user',
+ `service_instance` varchar(128) COLLATE utf8_bin DEFAULT NULL COMMENT 'ec instance info',
+ `ecm_instance` varchar(128) COLLATE utf8_bin DEFAULT NULL COMMENT 'ecm instance info ',
+ `ticket_id` VARCHAR(100) NOT NULL COMMENT 'ec ticket id',
+ `log_dir_suffix` varchar(128) COLLATE utf8_bin DEFAULT NULL COMMENT 'log path',
+ `request_times` INT(8) COMMENT 'resource request times',
+ `request_resource` VARCHAR(255) COMMENT 'request resource',
+ `used_times` INT(8) COMMENT 'resource used times',
+ `used_resource` VARCHAR(255) COMMENT 'used resource',
+ `release_times` INT(8) COMMENT 'resource released times',
+ `released_resource` VARCHAR(255) COMMENT 'released resource',
+ `release_time` datetime DEFAULT NULL COMMENT 'released time',
+ `used_time` datetime DEFAULT NULL COMMENT 'used time',
+ `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ KEY (`ticket_id`),
+ UNIQUE KEY `label_value_ticket_id` (`ticket_id`,`label_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
DROP TABLE IF EXISTS `linkis_cg_manager_label_service_instance`;
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ECResourceInfoAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ECResourceInfoAction.scala
new file mode 100644
index 000000000..e872c6916
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ECResourceInfoAction.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.action
+
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
+
+class ECResourceInfoAction extends GetAction with LinkisManagerAction {
+ override def suffixURLs: Array[String] = Array("linkisManager", "ecinfo/get")
+}
+object ECResourceInfoAction {
+ def newBuilder(): Builder = new Builder
+ class Builder private[ECResourceInfoAction]() {
+
+ private var user: String = _
+
+ private var ticketid: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setTicketid(ticketid: String): Builder = {
+ this.ticketid = ticketid
+ this
+ }
+
+ def build(): ECResourceInfoAction = {
+ if (user == null) throw new UJESClientBuilderException("user is needed!")
+ val ecResourceInfoAction = new ECResourceInfoAction
+ ecResourceInfoAction.setUser(user)
+ ecResourceInfoAction.setParameter("ticketid", ticketid)
+ ecResourceInfoAction
+ }
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ECResourceInfoResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ECResourceInfoResult.scala
new file mode 100644
index 000000000..c02843698
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ECResourceInfoResult.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.result
+
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+
+import java.util
+
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/ecinfo/get")
+class ECResourceInfoResult extends LinkisManagerResult {
+
+ private var ecResourceInfoRecord: util.Map[String, Any] = _
+
+ def setECResourceInfoRecord(ecResourceInfoRecord: util.Map[String, Any]): Unit = {
+ this.ecResourceInfoRecord = ecResourceInfoRecord
+ }
+
+ def getECResourceInfoRecord: util.Map[String, Any] = {
+ this.ecResourceInfoRecord
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/ECPathUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/ECPathUtils.scala
new file mode 100644
index 000000000..9cee398d7
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/ECPathUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.utils
+
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.io.File
+import java.nio.file.Paths
+
+object ECPathUtils {
+
+ def getECWOrkDirPathSuffix(user: String, ticketId: String, engineType: String, timeStamp: Long = System.currentTimeMillis()): String = {
+ val suffix = if (StringUtils.isBlank(engineType)) {
+ Paths.get(user, DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd")).toFile.getPath
+ } else {
+ Paths.get(user, DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd"), engineType).toFile.getPath
+ }
+ suffix + File.separator + ticketId
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.scala
index 60d6c2d97..296aa0c58 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.scala
@@ -18,19 +18,23 @@
package org.apache.linkis.ecm.server.operator
-import java.io.{File, RandomAccessFile}
-import java.util
+import org.apache.commons.io.IOUtils
+import org.apache.commons.io.input.ReversedLinesFileReader
+import org.apache.commons.lang.StringUtils
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.ecm.core.conf.ECMErrorCode
+import org.apache.linkis.ecm.server.conf.ECMConfiguration
import org.apache.linkis.ecm.server.exception.ECMErrorException
import org.apache.linkis.ecm.server.service.{EngineConnListService, LocalDirsHandleService}
import org.apache.linkis.manager.common.operator.Operator
import org.apache.linkis.manager.common.protocol.em.ECMOperateRequest
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang.StringUtils
-import org.apache.linkis.ecm.core.conf.ECMErrorCode
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.Charset
+import java.util
+import java.util.Collections
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.util.matching.Regex
@@ -44,19 +48,35 @@ class EngineConnLogOperator extends Operator with Logging {
override def apply(implicit parameters: Map[String, Any]): Map[String, Any] = {
val logPath = getLogPath
val lastRows = getAs("lastRows", 0)
+ val pageSize = getAs("pageSize", 100)
+ val fromLine = getAs("fromLine", 1)
+ val enableTail = getAs("enableTail", false)
if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue) {
throw new ECMErrorException(ECMErrorCode.EC_FETCH_LOG_FAILED, s"Cannot fetch more than ${EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue} lines of logs.")
} else if (lastRows > 0) {
val logs = Utils.exec(Array("tail", "-n", lastRows + "", logPath.getPath), 5000).split("\n")
return Map("logs" -> logs, "rows" -> logs.length)
}
- val pageSize = getAs("pageSize", 100)
- val fromLine = getAs("fromLine", 1)
+
val ignoreKeywords = getAs("ignoreKeywords", "")
val ignoreKeywordList = if (StringUtils.isNotEmpty(ignoreKeywords)) ignoreKeywords.split(",") else Array.empty[String]
val onlyKeywords = getAs("onlyKeywords", "")
val onlyKeywordList = if (StringUtils.isNotEmpty(onlyKeywords)) onlyKeywords.split(",") else Array.empty[String]
- val reader = new RandomAccessFile(logPath, "r")
+ var randomReader: RandomAccessFile = null
+ var reversedReader: ReversedLinesFileReader = null
+ if (enableTail) {
+ logger.info("enable log operator from tail to read")
+ reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset())
+ } else {
+ randomReader = new RandomAccessFile(logPath, "r")
+ }
+ def randomAndReversedReadLine(): String = {
+ if (null != randomReader) {
+ randomReader.readLine()
+ } else {
+ reversedReader.readLine()
+ }
+ }
val logs = new util.ArrayList[String](pageSize)
var readLine, skippedLine, lineNum = 0
var rowIgnore = false
@@ -67,7 +87,7 @@ class EngineConnLogOperator extends Operator with Logging {
}
val maxMultiline = EngineConnLogOperator.MULTILINE_MAX.getValue
Utils.tryFinally {
- var line = reader.readLine()
+ var line = randomAndReversedReadLine()
while (readLine < pageSize && line != null) {
lineNum += 1
if (skippedLine < fromLine - 1) {
@@ -95,12 +115,17 @@ class EngineConnLogOperator extends Operator with Logging {
readLine += 1
}
}
- line = reader.readLine
+ line = randomAndReversedReadLine()
}
- }(IOUtils.closeQuietly(reader))
+ }{
+ IOUtils.closeQuietly(randomReader)
+ IOUtils.closeQuietly(reversedReader)
+ }
+ if (enableTail) Collections.reverse(logs)
Map("logPath" -> logPath.getPath, "logs" -> logs, "endLine" -> lineNum, "rows" -> readLine)
}
+
private def includeLine(line: String,
onlyKeywordList: Array[String], ignoreKeywordList: Array[String]): Boolean = {
var accept: Boolean = ignoreKeywordList.isEmpty || !ignoreKeywordList.exists(line.contains)
@@ -114,27 +139,35 @@ class EngineConnLogOperator extends Operator with Logging {
engineConnListService = DataWorkCloudApplication.getApplicationContext.getBean(classOf[EngineConnListService])
localDirsHandleService = DataWorkCloudApplication.getApplicationContext.getBean(classOf[LocalDirsHandleService])
}
- val engineConnInstance = getAs(ECMOperateRequest.ENGINE_CONN_INSTANCE_KEY, getAs[String]("engineConnInstance", null))
- val (engineConnLogDir, ticketId) = Option(engineConnInstance).flatMap { instance =>
- engineConnListService.getEngineConns.asScala.find(_.getServiceInstance.getInstance == instance)
- }.map(engineConn => (engineConn.getEngineConnManagerEnv.engineConnLogDirs, engineConn.getTickedId)).getOrElse {
+ val logDIrSuffix = getAs("logDirSuffix", "")
+ val (engineConnLogDir, ticketId) = if (StringUtils.isNotBlank(logDIrSuffix)) {
+ val ecLogPath = ECMConfiguration.ENGINECONN_ROOT_DIR + File.pathSeparator + logDIrSuffix
val ticketId = getAs("ticketId", "")
- if (StringUtils.isBlank(ticketId)) {
- throw new ECMErrorException(ECMErrorCode.EC_FETCH_LOG_FAILED, s"the parameters of ${ECMOperateRequest.ENGINE_CONN_INSTANCE_KEY}, engineConnInstance and ticketId are both not exists.")
- }
- val logDir = engineConnListService.getEngineConn(ticketId).map(_.getEngineConnManagerEnv.engineConnLogDirs)
- .getOrElse {
- val creator = getAsThrow[String]("creator")
- val engineConnType = getAsThrow[String]("engineConnType")
- localDirsHandleService.getEngineConnLogDir(creator, ticketId, engineConnType)
+ (ecLogPath, ticketId)
+ } else {
+ val engineConnInstance = getAs(ECMOperateRequest.ENGINE_CONN_INSTANCE_KEY, getAs[String]("engineConnInstance", null))
+ Option(engineConnInstance).flatMap { instance =>
+ engineConnListService.getEngineConns.asScala.find(_.getServiceInstance.getInstance == instance)
+ }.map(engineConn => (engineConn.getEngineConnManagerEnv.engineConnLogDirs, engineConn.getTickedId)).getOrElse {
+ val ticketId = getAs("ticketId", "")
+ if (StringUtils.isBlank(ticketId)) {
+ throw new ECMErrorException(ECMErrorCode.EC_FETCH_LOG_FAILED, s"the parameters of ${ECMOperateRequest.ENGINE_CONN_INSTANCE_KEY}, engineConnInstance and ticketId are both not exists.")
}
- (logDir, ticketId)
+ val logDir = engineConnListService.getEngineConn(ticketId).map(_.getEngineConnManagerEnv.engineConnLogDirs)
+ .getOrElse {
+ val creator = getAsThrow[String]("creator")
+ val engineConnType = getAsThrow[String]("engineConnType")
+ localDirsHandleService.getEngineConnLogDir(creator, ticketId, engineConnType)
+ }
+ (logDir, ticketId)
+ }
}
+
val logPath = new File(engineConnLogDir, getAs("logType", EngineConnLogOperator.LOG_FILE_NAME.getValue));
if (!logPath.exists() || !logPath.isFile) {
throw new ECMErrorException(ECMErrorCode.EC_FETCH_LOG_FAILED, s"LogFile $logPath is not exists or is not a file.")
}
- info(s"Try to fetch EngineConn(id: $ticketId, instance: $engineConnInstance) logs from ${logPath.getPath}.")
+ info(s"Try to fetch EngineConn(id: $ticketId, logs from ${logPath.getPath}.")
logPath
}
}
@@ -143,6 +176,8 @@ object EngineConnLogOperator {
val OPERATOR_NAME = "engineConnLog"
val LOG_FILE_NAME = CommonVars("linkis.engineconn.log.filename", "stdout")
val MAX_LOG_FETCH_SIZE = CommonVars("linkis.engineconn.log.fetch.lines.max", 5000)
+
+ val MAX_LOG_TAIL_START_SIZE = CommonVars("linkis.engineconn.log.tail.start.size", 20000)
// yyyy-MM-dd HH:mm:ss.SSS
val MULTILINE_PATTERN = CommonVars("linkis.engineconn.log.multiline.pattern", "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}")
val MULTILINE_MAX = CommonVars("linkis.engineconn.log.multiline.max", 500)
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultLocalDirsHandleService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultLocalDirsHandleService.scala
index 5f0c84638..49ad1e73f 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultLocalDirsHandleService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultLocalDirsHandleService.scala
@@ -17,14 +17,12 @@
package org.apache.linkis.ecm.server.service.impl
-import java.io.File
-import java.nio.file.Paths
-
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.ecm.server.conf.ECMConfiguration._
import org.apache.linkis.ecm.server.service.LocalDirsHandleService
-import org.apache.commons.lang.StringUtils
-import org.apache.commons.lang.time.DateFormatUtils
+import org.apache.linkis.governance.common.utils.ECPathUtils
+
+import java.io.File
class DefaultLocalDirsHandleService extends LocalDirsHandleService {
@@ -36,12 +34,9 @@ class DefaultLocalDirsHandleService extends LocalDirsHandleService {
override def getEngineConnManagerHomeDir: String = ECM_HOME_DIR
override def getEngineConnWorkDir(user: String, ticketId: String, engineType: String): String = {
- val prefix = if (StringUtils.isBlank(engineType)) {
- Paths.get(ENGINECONN_ROOT_DIR, user, DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd")).toFile.getPath
- } else {
- Paths.get(ENGINECONN_ROOT_DIR, user, DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd"), engineType).toFile.getPath
- }
- new FsPath(prefix + File.separator + ticketId).getPath
+ val prefix = ENGINECONN_ROOT_DIR
+ val suffix = ECPathUtils.getECWOrkDirPathSuffix(user, ticketId, engineType)
+ new FsPath(prefix + File.separator + suffix).getPath
}
override def getEngineConnPublicDir: String = ENGINECONN_PUBLIC_DIR
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CSEntranceInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CSEntranceInterceptor.scala
index 37ee37d1f..1440ba49c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CSEntranceInterceptor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CSEntranceInterceptor.scala
@@ -31,7 +31,7 @@ class CSEntranceInterceptor extends EntranceInterceptor with Logging {
override def apply(task: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
logger.debug("Start to execute CSEntranceInterceptor")
Utils.tryAndWarn(CSEntranceHelper.addCSVariable(task))
- Utils.tryAndWarn(CSEntranceHelper.resetCreator(task))
+ //Utils.tryAndWarn(CSEntranceHelper.resetCreator(task))
Utils.tryAndWarn(CSEntranceHelper.initNodeCSInfo(task))
logger.debug("Finished to execute CSEntranceInterceptor")
task
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
new file mode 100644
index 000000000..14167ee48
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.restful;
+
+import org.apache.linkis.common.conf.Configuration;
+import org.apache.linkis.manager.am.exception.AMErrorException;
+import org.apache.linkis.manager.am.service.ECResourceInfoService;
+import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
+import org.apache.linkis.server.Message;
+import org.apache.linkis.server.utils.ModuleUserUtils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+
+@RequestMapping(
+ path = "/linkisManager/ecinfo",
+ produces = {"application/json"})
+@RestController
+public class ECResourceInfoRestfulApi {
+ @Autowired private ECResourceInfoService ecResourceInfoService;
+
+ @RequestMapping(path = "/get", method = RequestMethod.GET)
+ public Message getECInfo(
+ HttpServletRequest req, @RequestParam(value = "ticketid") String ticketid)
+ throws AMErrorException {
+ String userName = ModuleUserUtils.getOperationUser(req, "getECInfo");
+ ECResourceInfoRecord ecResourceInfoRecord =
+ ecResourceInfoService.getECResourceInfoRecord(ticketid);
+ if (null != ecResourceInfoRecord
+ && (userName.equalsIgnoreCase(ecResourceInfoRecord.getCreateUser())
+ || Configuration.isAdmin(userName))) {
+ return Message.ok().data("ecResourceInfoRecord", ecResourceInfoRecord);
+ } else {
+ return Message.error("tickedId not exist:" + ticketid);
+ }
+ }
+
+ @RequestMapping(path = "/delete/{ticketid}}", method = RequestMethod.DELETE)
+ public Message deleteECInfo(HttpServletRequest req, @PathVariable("ticketid") String ticketid)
+ throws AMErrorException {
+ String userName = ModuleUserUtils.getOperationUser(req, "deleteECInfo");
+ ECResourceInfoRecord ecResourceInfoRecord =
+ ecResourceInfoService.getECResourceInfoRecord(ticketid);
+ if (null != ecResourceInfoRecord
+ && (userName.equalsIgnoreCase(ecResourceInfoRecord.getCreateUser())
+ || Configuration.isAdmin(userName))) {
+ ecResourceInfoService.deleteECResourceInfoRecord(ecResourceInfoRecord.getId());
+ return Message.ok().data("ecResourceInfoRecord", ecResourceInfoRecord);
+ } else {
+ return Message.error("tickedId not exist:" + ticketid);
+ }
+ }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
index 077177c67..506ef73f1 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
@@ -319,6 +319,9 @@ public class EMRestfulApi {
}
parameters.put(OperateRequest$.MODULE$.OPERATOR_NAME_KEY(), "engineConnLog");
parameters.put(ECMOperateRequest$.MODULE$.ENGINE_CONN_INSTANCE_KEY(), engineInstance);
+ if (!parameters.containsKey("enableTail")) {
+ parameters.put("enableTail", true);
+ }
} catch (JsonProcessingException e) {
logger.error(
"Fail to process the operation parameters: [{}] in request",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/ECResourceInfoService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/ECResourceInfoService.java
new file mode 100644
index 000000000..12ccdd86d
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/ECResourceInfoService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.service;
+
+import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
+
+public interface ECResourceInfoService {
+
+ ECResourceInfoRecord getECResourceInfoRecord(String ticketId);
+
+ void deleteECResourceInfoRecordByTicketId(String ticketId);
+
+ void deleteECResourceInfoRecord(Integer id);
+
+ // TODO add search method
+
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
new file mode 100644
index 000000000..9526b8802
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.service.impl;
+
+import org.apache.linkis.manager.am.restful.EMRestfulApi;
+import org.apache.linkis.manager.am.service.ECResourceInfoService;
+import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
+import org.apache.linkis.manager.dao.ECResourceRecordMapper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class ECResourceInfoServiceImpl implements ECResourceInfoService {
+
+ private static final Logger logger = LoggerFactory.getLogger(EMRestfulApi.class);
+
+ @Autowired private ECResourceRecordMapper ecResourceRecordMapper;
+
+ @Override
+ public ECResourceInfoRecord getECResourceInfoRecord(String ticketId) {
+ if (StringUtils.isNoneBlank(ticketId)) {
+ return ecResourceRecordMapper.getECResourceInfoRecord(ticketId);
+ }
+ return null;
+ }
+
+ @Override
+ public void deleteECResourceInfoRecordByTicketId(String ticketId) {
+ if (StringUtils.isNoneBlank(ticketId)) {
+ logger.info("Start to delete ec:{} info ", ticketId);
+ ecResourceRecordMapper.deleteECResourceInfoRecordByTicketId(ticketId);
+ }
+ }
+
+ @Override
+ public void deleteECResourceInfoRecord(Integer id) {
+ logger.info("Start to delete ec id:{} info ", id);
+ ecResourceRecordMapper.deleteECResourceInfoRecord(id);
+ }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java b/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
new file mode 100644
index 000000000..267190a83
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.common.entity.persistence;
+
+import org.apache.linkis.manager.common.entity.resource.Resource;
+import org.apache.linkis.manager.common.utils.ResourceUtils;
+
+import java.util.Date;
+
+public class ECResourceInfoRecord {
+
+ private Integer id;
+
+ private String createUser;
+
+ private String labelValue;
+
+ private String ticketId;
+
+ private String serviceInstance;
+
+ private String usedResource;
+
+ private String requestResource;
+
+ private String releasedResource;
+
+ private String ecmInstance;
+
+ private int requestTimes;
+ private int usedTimes;
+ private int releaseTimes;
+
+ private Date usedTime;
+
+ private Date createTime;
+
+ private Date releaseTime;
+
+ private String logDirSuffix;
+
+ public ECResourceInfoRecord() {}
+
+ public ECResourceInfoRecord(
+ String labelValue,
+ String createUser,
+ String ticketId,
+ Resource resource,
+ String logDirSuffix) {
+ this.labelValue = labelValue;
+ this.ticketId = ticketId;
+ this.createUser = createUser;
+ if (null != resource) {
+ this.requestResource = ResourceUtils.serializeResource(resource);
+ }
+ this.logDirSuffix = logDirSuffix;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getLabelValue() {
+ return labelValue;
+ }
+
+ public void setLabelValue(String labelValue) {
+ this.labelValue = labelValue;
+ }
+
+ public String getTicketId() {
+ return ticketId;
+ }
+
+ public void setTicketId(String ticketId) {
+ this.ticketId = ticketId;
+ }
+
+ public String getServiceInstance() {
+ return serviceInstance;
+ }
+
+ public void setServiceInstance(String serviceInstance) {
+ this.serviceInstance = serviceInstance;
+ }
+
+ public String getUsedResource() {
+ return usedResource;
+ }
+
+ public void setUsedResource(String usedResource) {
+ this.usedResource = usedResource;
+ }
+
+ public Date getUsedTime() {
+ return usedTime;
+ }
+
+ public void setUsedTime(Date usedTime) {
+ this.usedTime = usedTime;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getRequestResource() {
+ return requestResource;
+ }
+
+ public void setRequestResource(String requestResource) {
+ this.requestResource = requestResource;
+ }
+
+ public String getReleasedResource() {
+ return releasedResource;
+ }
+
+ public void setReleasedResource(String releasedResource) {
+ this.releasedResource = releasedResource;
+ }
+
+ public int getRequestTimes() {
+ return requestTimes;
+ }
+
+ public void setRequestTimes(int requestTimes) {
+ this.requestTimes = requestTimes;
+ }
+
+ public int getUsedTimes() {
+ return usedTimes;
+ }
+
+ public void setUsedTimes(int usedTimes) {
+ this.usedTimes = usedTimes;
+ }
+
+ public int getReleaseTimes() {
+ return releaseTimes;
+ }
+
+ public void setReleaseTimes(int releaseTimes) {
+ this.releaseTimes = releaseTimes;
+ }
+
+ public Date getReleaseTime() {
+ return releaseTime;
+ }
+
+ public void setReleaseTime(Date releaseTime) {
+ this.releaseTime = releaseTime;
+ }
+
+ public String getEcmInstance() {
+ return ecmInstance;
+ }
+
+ public void setEcmInstance(String ecmInstance) {
+ this.ecmInstance = ecmInstance;
+ }
+
+ public String getLogDirSuffix() {
+ return logDirSuffix;
+ }
+
+ public void setLogDirSuffix(String logDirSuffix) {
+ this.logDirSuffix = logDirSuffix;
+ }
+
+ public String getCreateUser() {
+ return createUser;
+ }
+
+ public void setCreateUser(String createUser) {
+ this.createUser = createUser;
+ }
+
+ @Override
+ public String toString() {
+ return "ECResourceInfoRecord{"
+ + "createUser='"
+ + createUser
+ + '\''
+ + ", ticketId='"
+ + ticketId
+ + '\''
+ + ", serviceInstance='"
+ + serviceInstance
+ + '\''
+ + '}';
+ }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceResourceActionRecord.java b/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceResourceActionRecord.java
deleted file mode 100644
index 7540ce30b..000000000
--- a/linkis-computation-governance/linkis-manager/linkis-manager-commons/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceResourceActionRecord.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.common.entity.persistence;
-
-import org.apache.linkis.manager.common.entity.resource.Resource;
-import org.apache.linkis.manager.common.entity.resource.ResourceType;
-import org.apache.linkis.manager.common.utils.ResourceUtils;
-
-import java.util.Date;
-
-public class PersistenceResourceActionRecord {
-
- private Integer id;
-
- private String labelValue;
-
- private String ticketId;
-
- private Integer requestTimes;
-
- private String requestResourceAll;
-
- private Integer usedTimes;
-
- private String usedResourceAll;
-
- private Integer releaseTimes;
-
- private String releaseResourceAll;
-
- private Date updateTime;
-
- private Date createTime;
-
- public PersistenceResourceActionRecord() {}
-
- public PersistenceResourceActionRecord(String labelValue, String ticketId, Resource resource) {
- this.labelValue = labelValue;
- this.ticketId = ticketId;
- this.requestTimes = 0;
- this.requestResourceAll =
- ResourceUtils.serializeResource(Resource.initResource(ResourceType.LoadInstance));
- this.usedTimes = 0;
- this.usedResourceAll =
- ResourceUtils.serializeResource(Resource.initResource(ResourceType.LoadInstance));
- this.releaseTimes = 0;
- this.releaseResourceAll =
- ResourceUtils.serializeResource(Resource.initResource(ResourceType.LoadInstance));
- this.updateTime = new Date(System.currentTimeMillis());
- this.createTime = new Date(System.currentTimeMillis());
- }
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public String getLabelValue() {
- return labelValue;
- }
-
- public void setLabelValue(String labelValue) {
- this.labelValue = labelValue;
- }
-
- public String getTicketId() {
- return ticketId;
- }
-
- public void setTicketId(String ticketId) {
- this.ticketId = ticketId;
- }
-
- public Integer getRequestTimes() {
- return requestTimes;
- }
-
- public void setRequestTimes(Integer requestTimes) {
- this.requestTimes = requestTimes;
- }
-
- public String getRequestResourceAll() {
- return requestResourceAll;
- }
-
- public void setRequestResourceAll(String requestResourceAll) {
- this.requestResourceAll = requestResourceAll;
- }
-
- public Integer getUsedTimes() {
- return usedTimes;
- }
-
- public void setUsedTimes(Integer usedTimes) {
- this.usedTimes = usedTimes;
- }
-
- public String getUsedResourceAll() {
- return usedResourceAll;
- }
-
- public void setUsedResourceAll(String usedResourceAll) {
- this.usedResourceAll = usedResourceAll;
- }
-
- public Integer getReleaseTimes() {
- return releaseTimes;
- }
-
- public void setReleaseTimes(Integer releaseTimes) {
- this.releaseTimes = releaseTimes;
- }
-
- public String getReleaseResourceAll() {
- return releaseResourceAll;
- }
-
- public void setReleaseResourceAll(String releaseResourceAll) {
- this.releaseResourceAll = releaseResourceAll;
- }
-
- public Date getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ECResourceRecordMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ECResourceRecordMapper.java
new file mode 100644
index 000000000..45359d56d
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ECResourceRecordMapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.dao;
+
+import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
+
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+@Mapper
+public interface ECResourceRecordMapper {
+
+ ECResourceInfoRecord getECResourceInfoRecord(@Param("ticketId") String ticketId);
+
+ void updateECResourceInfoRecord(ECResourceInfoRecord resourceActionRecord);
+
+ void insertECResourceInfoRecord(ECResourceInfoRecord resourceActionRecord);
+
+ void deleteECResourceInfoRecordByTicketId(@Param("ticketId") String ticketId);
+
+ void deleteECResourceInfoRecord(@Param("id") Integer id);
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ResourceManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ResourceManagerMapper.java
index 071442ee3..949425a62 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ResourceManagerMapper.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/ResourceManagerMapper.java
@@ -19,13 +19,11 @@ package org.apache.linkis.manager.dao;
import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
import org.apache.linkis.manager.common.entity.persistence.PersistenceResource;
-import org.apache.linkis.manager.common.entity.persistence.PersistenceResourceActionRecord;
import org.apache.ibatis.annotations.*;
import java.util.List;
-@Mapper
public interface ResourceManagerMapper {
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
// @SelectKey(statement = "select last_insert_id() AS id", keyProperty = "id", before = false,
@@ -103,12 +101,4 @@ public interface ResourceManagerMapper {
void deleteResourceRelByResourceId(@Param("ids") List<Integer> ids);
PersistenceResource getResourceById(@Param("id") Integer id);
-
- PersistenceResourceActionRecord getResourceActionRecord(@Param("ticketId") String ticketId);
-
- void updateResourceActionRecord(
- @Param("resourceActionRecord") PersistenceResourceActionRecord resourceActionRecord);
-
- PersistenceResourceActionRecord insertResourceActionRecord(
- @Param("resourceActionRecord") PersistenceResourceActionRecord resourceActionRecord);
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml
new file mode 100644
index 000000000..12d50fbd3
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+
+<mapper namespace="org.apache.linkis.manager.dao.ECResourceRecordMapper">
+
+ <select id="getECResourceInfoRecord" resultType="org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord">
+ SELECT * FROM linkis_cg_ec_resource_info_record WHERE ticket_id = #{ticketId}
+ </select>
+
+
+ <insert id="insertECResourceInfoRecord" flushCache="true" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord">
+ INSERT INTO linkis_cg_ec_resource_info_record(label_value,create_user, ticket_id, service_instance, request_times, request_resource,
+ used_times, used_resource, release_times, released_resource,release_time,used_time,ecm_instance,log_dir_suffix)
+ VALUES (#{labelValue},#{createUser},#{ticketId},#{serviceInstance},#{requestTimes},
+ #{requestResource},#{usedTimes},#{usedResource},
+ #{releaseTimes},#{releasedResource},#{releaseTime},#{usedTime},#{ecmInstance},#{logDirSuffix})
+ </insert>
+
+ <update id="updateECResourceInfoRecord" flushCache="true" parameterType="org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord">
+ UPDATE linkis_cg_ec_resource_info_record
+ <trim prefix="set" suffixOverrides=",">
+ <if test="serviceInstance != null">service_instance = #{serviceInstance},</if>
+ <if test="requestTimes != null">request_times = #{requestTimes},</if>
+ <if test="usedTimes != null">used_times = #{usedTimes},</if>
+ <if test="releaseTimes != null">release_times = #{releaseTimes},</if>
+ <if test="usedTime != null">used_time = #{usedTime},</if>
+ <if test="releaseTime != null">release_time = #{releaseTime},</if>
+ <if test="usedResource != null">used_resource = #{usedResource},</if>
+ <if test="requestResource != null">request_resource = #{requestResource},</if>
+ <if test="releasedResource != null">released_resource = #{releasedResource},</if>
+ <if test="ecmInstance != null">ecm_instance = #{ecmInstance},</if>
+ </trim>
+ <![CDATA[
+ WHERE id = #{id}
+ ]]>
+ </update>
+
+ <delete id="deleteECResourceInfoRecordByTicketId">
+ delete from linkis_cg_ec_resource_info_record WHERE ticket_id = #{ticketId}
+ </delete>
+
+ <delete id="deleteECResourceInfoRecord">
+ delete from linkis_cg_ec_resource_info_record WHERE id = #{id}
+ </delete>
+
+</mapper>
\ No newline at end of file
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ResourceManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ResourceManagerMapper.xml
index e36115afb..09e40887b 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ResourceManagerMapper.xml
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ResourceManagerMapper.xml
@@ -38,24 +38,5 @@
SELECT * FROM linkis_cg_manager_linkis_resources WHERE id = #{id}
</select>
- <select id="getResourceActionRecord" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceResourceActionRecord">
- SELECT * FROM linkis_cg_rm_resource_action_record WHERE ticket_id = #{ticketId}
- </select>
-
- <update id="updateResourceActionRecord">
- UPDATE linkis_cg_rm_resource_action_record
- SET request_times = #{resourceActionRecord.requestTimes}, request_resource_all = #{resourceActionRecord.requestResourceAll},
- used_times = #{resourceActionRecord.usedTimes}, used_resource_all = #{resourceActionRecord.usedResourceAll},
- release_times = #{resourceActionRecord.releaseTimes}, release_resource_all = #{resourceActionRecord.releaseResourceAll},
- update_time = now()
- WHERE ticket_id = #{resourceActionRecord.ticketId}
- </update>
- <insert id="insertResourceActionRecord" useGeneratedKeys="true" keyProperty="id">
- INSERT INTO linkis_cg_rm_resource_action_record (label_value, ticket_id, request_times, request_resource_all,
- used_times, used_resource_all, release_times, release_resource_all)
- VALUES (#{resourceActionRecord.labelValue}, #{resourceActionRecord.ticketId}, #{resourceActionRecord.requestTimes},
- #{resourceActionRecord.requestResourceAll}, #{resourceActionRecord.usedTimes}, #{resourceActionRecord.usedResourceAll},
- #{resourceActionRecord.releaseTimes}, #{resourceActionRecord.releaseResourceAll})
- </insert>
</mapper>
\ No newline at end of file
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/ResourceManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/ResourceManagerPersistence.java
index 3de7e300a..65b7c1d47 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/ResourceManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/ResourceManagerPersistence.java
@@ -20,7 +20,6 @@ package org.apache.linkis.manager.persistence;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
import org.apache.linkis.manager.common.entity.persistence.PersistenceResource;
-import org.apache.linkis.manager.common.entity.persistence.PersistenceResourceActionRecord;
import org.apache.linkis.manager.exception.PersistenceErrorException;
import org.apache.linkis.manager.label.entity.Label;
@@ -137,10 +136,4 @@ public interface ResourceManagerPersistence {
void deleteResourceRelByResourceId(List<Integer> id);
PersistenceResource getNodeResourceById(Integer id);
-
- PersistenceResourceActionRecord getResourceActionRecord(String ticketId);
-
- void insertResourceActionRecord(PersistenceResourceActionRecord resourceActionRecord);
-
- void updateResourceActionRecord(PersistenceResourceActionRecord resourceActionRecord);
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultResourceManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultResourceManagerPersistence.java
index f52898131..e76eb0d49 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultResourceManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultResourceManagerPersistence.java
@@ -20,7 +20,6 @@ package org.apache.linkis.manager.persistence.impl;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
import org.apache.linkis.manager.common.entity.persistence.PersistenceResource;
-import org.apache.linkis.manager.common.entity.persistence.PersistenceResourceActionRecord;
import org.apache.linkis.manager.dao.LabelManagerMapper;
import org.apache.linkis.manager.dao.NodeManagerMapper;
import org.apache.linkis.manager.dao.ResourceManagerMapper;
@@ -202,19 +201,4 @@ public class DefaultResourceManagerPersistence implements ResourceManagerPersist
PersistenceResource resource = resourceManagerMapper.getResourceById(id);
return resource;
}
-
- @Override
- public PersistenceResourceActionRecord getResourceActionRecord(String ticketId) {
- return resourceManagerMapper.getResourceActionRecord(ticketId);
- }
-
- @Override
- public void insertResourceActionRecord(PersistenceResourceActionRecord resourceActionRecord) {
- resourceManagerMapper.insertResourceActionRecord(resourceActionRecord);
- }
-
- @Override
- public void updateResourceActionRecord(PersistenceResourceActionRecord resourceActionRecord) {
- resourceManagerMapper.updateResourceActionRecord(resourceActionRecord);
- }
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/DefaultResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/DefaultResourceManager.scala
index c2644c64e..d948633b5 100644
--- a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/DefaultResourceManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/DefaultResourceManager.scala
@@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.common.conf.RMConfiguration
import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, InfoRMNode}
-import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel
+import org.apache.linkis.manager.common.entity.persistence.{PersistenceLabel, PersistenceResource}
import org.apache.linkis.manager.common.entity.resource.{NodeResource, Resource, ResourceType}
import org.apache.linkis.manager.common.exception.{RMErrorException, RMWarnException}
import org.apache.linkis.manager.common.utils.{ManagerUtils, ResourceUtils}
@@ -54,25 +54,25 @@ import scala.collection.JavaConversions._
class DefaultResourceManager extends ResourceManager with Logging with InitializingBean {
@Autowired
- var resourceManagerPersistence: ResourceManagerPersistence = _
+ private var resourceManagerPersistence: ResourceManagerPersistence = _
@Autowired
- var nodeManagerPersistence: NodeManagerPersistence = _
+ private var nodeManagerPersistence: NodeManagerPersistence = _
@Autowired
- var resourceLockService: ResourceLockService = _
+ private var resourceLockService: ResourceLockService = _
@Autowired
- var labelResourceService: LabelResourceService = _
+ private var labelResourceService: LabelResourceService = _
@Autowired
- var externalResourceService: ExternalResourceService = _
+ private var externalResourceService: ExternalResourceService = _
@Autowired
- var resourceLogService: ResourceLogService = _
+ private var resourceLogService: ResourceLogService = _
@Autowired
- var labelManagerPersistence: LabelManagerPersistence = _
+ private var labelManagerPersistence: LabelManagerPersistence = _
@Autowired
private var nodeMetricManagerPersistence: NodeMetricManagerPersistence = _
@@ -80,11 +80,11 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
@Autowired
private var nodeLabelService: NodeLabelService = _
- var requestResourceServices: Array[RequestResourceService] = _
+ private var requestResourceServices: Array[RequestResourceService] = _
- val gson = BDPJettyServerHelper.gson
+ private val gson = BDPJettyServerHelper.gson
- val labelFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
+ private val labelFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
override def afterPropertiesSet(): Unit = {
@@ -132,10 +132,11 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
// TODO get ID Label set label resource
Utils.tryCatch {
labelResourceService.setLabelResource(eMInstanceLabel, resource, eMInstanceLabel.getStringValue)
- resourceLogService.success(ChangeType.ECM_INIT, null, eMInstanceLabel)
+ resourceLogService.success(ChangeType.ECM_INIT, resource.getMaxResource, null, eMInstanceLabel)
} {
case exception: Exception => {
- resourceLogService.failed(ChangeType.ECM_INIT, null, eMInstanceLabel, exception)
+ resourceLogService.failed(ChangeType.ECM_INIT, resource.getMaxResource, null, eMInstanceLabel, exception)
+ throw exception
}
case _ =>
}
@@ -168,10 +169,10 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
labelResourceService.removeResourceByLabel(eMInstanceLabel)
labelContainer.setCurrentLabel(eMInstanceLabel)
resourceLockService.unLock(labelContainer)
- resourceLogService.success(ChangeType.ECM_CLEAR, null, eMInstanceLabel)
+ resourceLogService.success(ChangeType.ECM_CLEAR, Resource.initResource(ResourceType.LoadInstance), null, eMInstanceLabel)
} {
case exception: Exception => {
- resourceLogService.failed(ChangeType.ECM_CLEAR, null, eMInstanceLabel, exception)
+ resourceLogService.failed(ChangeType.ECM_CLEAR, Resource.initResource(ResourceType.LoadInstance), null, eMInstanceLabel, exception)
}
case _ =>
}
@@ -248,7 +249,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
logger.info(s"ResourceChanged:${label.getStringValue} --> ${labelResource}")
resourceCheck(label, labelResource)
if(label.getClass.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass)) {
- resourceLogService.recordUserResourceAction(labelContainer.getCombinedUserCreatorEngineTypeLabel, tickedId, ChangeType.ENGINE_REQUEST, resource.getLockedResource)
+ resourceLogService.recordUserResourceAction(labelContainer, tickedId, ChangeType.ENGINE_REQUEST, resource.getLockedResource)
}
}
case _ =>
@@ -308,8 +309,10 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
override def resourceUsed(labels: util.List[Label[_]], usedResource: NodeResource): Unit = {
val labelContainer = labelResourceService.enrichLabels(labels)
var lockedResource: NodeResource = null
+ var persistenceResource: PersistenceResource = null
try {
- lockedResource = labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel)
+ persistenceResource = labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel)
+ lockedResource = ResourceUtils.fromPersistenceResource(persistenceResource)
} catch {
case e: NullPointerException =>
error(s"EngineInstanceLabel [${labelContainer.getEngineInstanceLabel}] cause NullPointerException")
@@ -338,7 +341,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
lockedResource.setUsedResource(lockedResource.getLockedResource)
lockedResource.setLockedResource(Resource.getZeroResource(lockedResource.getLockedResource))
labelResourceService.setLabelResource(engineInstanceLabel, lockedResource, labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue)
- resourceLogService.success(ChangeType.ENGINE_INIT, engineInstanceLabel)
+ resourceLogService.success(ChangeType.ENGINE_INIT, lockedResource.getLockedResource, engineInstanceLabel)
} {
case exception: Exception => {
error(s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}", exception)
@@ -354,11 +357,11 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
labelResourceService.setLabelResource(label, labelResource, labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue)
label match {
case emLabel: EMInstanceLabel =>
- resourceLogService.success(ChangeType.ECM_RESOURCE_ADD, null, emLabel)
+ resourceLogService.success(ChangeType.ECM_RESOURCE_ADD, lockedResource.getUsedResource, null, emLabel)
case _ =>
}
if(label.getClass.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass)) {
- resourceLogService.recordUserResourceAction(labelContainer.getCombinedUserCreatorEngineTypeLabel, labelContainer.getEngineInstanceLabel, ChangeType.ENGINE_INIT, addedResource)
+ resourceLogService.recordUserResourceAction(labelContainer, persistenceResource.getTicketId, ChangeType.ENGINE_INIT, addedResource)
}
resourceCheck(label, labelResource)
}
@@ -391,7 +394,8 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
*/
override def resourceReleased(labels: util.List[Label[_]]): Unit = {
val labelContainer = labelResourceService.enrichLabels(labels)
- val usedResource = labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel)
+ val persistenceResource: PersistenceResource = labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel)
+ val usedResource = ResourceUtils.fromPersistenceResource(persistenceResource)
if (usedResource == null) {
throw new RMErrorException(RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode, s"No used resource found by engine ${labelContainer.getEngineInstanceLabel}")
}
@@ -420,29 +424,23 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
labelResource.setLeftResource(labelResource.getLeftResource + usedResource.getLockedResource)
}
labelResourceService.setLabelResource(label, labelResource, labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue)
- if(label.getClass.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass)) {
- if(usedResource.getUsedResource != null) {
- resourceLogService.recordUserResourceAction(labelContainer.getCombinedUserCreatorEngineTypeLabel, labelContainer.getEngineInstanceLabel, ChangeType.ENGINE_CLEAR, usedResource.getUsedResource)
+ if (label.getClass.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass)) {
+ if (usedResource.getUsedResource != null) {
+ resourceLogService.recordUserResourceAction(labelContainer, persistenceResource.getTicketId, ChangeType.ENGINE_CLEAR, usedResource.getUsedResource)
} else if (usedResource.getLockedResource != null) {
- resourceLogService.recordUserResourceAction(labelContainer.getCombinedUserCreatorEngineTypeLabel, labelContainer.getEngineInstanceLabel, ChangeType.ENGINE_CLEAR, usedResource.getLockedResource)
+ resourceLogService.recordUserResourceAction(labelContainer, persistenceResource.getTicketId, ChangeType.ENGINE_CLEAR, usedResource.getLockedResource)
}
}
label match {
case emLabel: EMInstanceLabel =>
- resourceLogService.success(ChangeType.ECM_Resource_MINUS, null, emLabel)
+ resourceLogService.success(ChangeType.ECM_Resource_MINUS, labelResource.getUsedResource, null, emLabel)
case _ =>
}
resourceCheck(label, labelResource)
}
} {
- case exception: Exception => {
- label match {
- case emLabel: EMInstanceLabel =>
- resourceLogService.failed(ChangeType.ECM_Resource_MINUS, null, emLabel, exception)
- case _ =>
- }
- }
- case _ =>
+ case exception: Exception =>
+ logger.warn(s"Failed to release resource label ${label.getStringValue} resource ${usedResource.getUsedResource.toJson}")
}
case _ =>
}
@@ -451,10 +449,10 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
val engineInstanceLabel = tmpLabel.asInstanceOf[EngineInstanceLabel]
Utils.tryCatch {
labelResourceService.removeResourceByLabel(engineInstanceLabel)
- resourceLogService.success(ChangeType.ENGINE_CLEAR, engineInstanceLabel, null)
+ resourceLogService.success(ChangeType.ENGINE_CLEAR, usedResource.getUsedResource, engineInstanceLabel, null)
} {
case exception: Exception => {
- resourceLogService.failed(ChangeType.ENGINE_CLEAR, engineInstanceLabel, null, exception)
+ resourceLogService.failed(ChangeType.ENGINE_CLEAR, usedResource.getUsedResource, engineInstanceLabel, null, exception)
throw exception
}
case _ =>
@@ -535,17 +533,6 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
}
-
-// private def askAgainAfter(interval: Long): Unit = {
-// val realInterval = if (interval <= 0) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue else interval
-// Utils.defaultScheduler.schedule(
-// new UnlockTimeoutResourceRunnable(labels, engineInstanceLabel, ticketId),
-// realInterval,
-// TimeUnit.MILLISECONDS
-// )
-// logger.info(s"delayed resource unlocked for ${engineInstanceLabel}")
-// }
-
override def resourceReport(labels: util.List[Label[_]], reportResource: NodeResource): Unit = {
//TODO
diff --git a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/ResourceLogService.scala b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/ResourceLogService.scala
index ae7ad9624..48eb72ba0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/ResourceLogService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/service/impl/ResourceLogService.scala
@@ -14,114 +14,105 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.linkis.resourcemanager.service.impl
+import org.apache.commons.lang.time.DateFormatUtils
import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.manager.common.entity.persistence.PersistenceResourceActionRecord
-import org.apache.linkis.manager.common.entity.resource.{Resource, ResourceActionRecord, ResourceType}
-import org.apache.linkis.manager.common.exception.ResourceWarnException
-import org.apache.linkis.manager.common.utils.ResourceUtils
-import org.apache.linkis.manager.label.entity.{CombinedLabel, Label}
+import org.apache.linkis.governance.common.utils.ECPathUtils
+import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord
+import org.apache.linkis.manager.common.entity.resource.Resource
+import org.apache.linkis.manager.dao.ECResourceRecordMapper
+import org.apache.linkis.manager.label.entity.CombinedLabel
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel
-import org.apache.linkis.manager.persistence.ResourceManagerPersistence
-import org.apache.linkis.resourcemanager.service.LabelResourceService
+import org.apache.linkis.resourcemanager.domain.RMLabelContainer
import org.apache.linkis.resourcemanager.utils.RMUtils
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
+import java.io.File
+import java.util.Date
+
@Component
-case class ResourceLogService() extends Logging{
+class ResourceLogService extends Logging {
- @Autowired
- var labelResourceService: LabelResourceService = _
@Autowired
- var resourceManagerPersistence: ResourceManagerPersistence = _
-
+ private var ecResourceRecordMapper: ECResourceRecordMapper = _
- private def printLog(changeType: String, status: String, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null): String = {
+ private def printLog(changeType: String, resource: Resource, status: String, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null): String = {
val logString = new StringBuilder(changeType + " ")
logString ++= (status + ", ")
- if (engineLabel != null) {
- val engineResource = labelResourceService.getLabelResource(engineLabel)
- var usedResource = Resource.initResource(ResourceType.Default)
- if (engineResource != null && engineResource.getUsedResource != null) {
- usedResource = engineResource.getUsedResource
- }
+ if (engineLabel != null && resource != null) {
logString ++= ("engine current resource:")
logString ++= (engineLabel.getServiceInstance.getInstance)
- logString ++= (usedResource.toJson + " ")
+ logString ++= (resource.toJson + " ")
}
- if (ecmLabel != null) {
- val ecmResource = labelResourceService.getLabelResource(ecmLabel)
- var usedResource = Resource.initResource(ResourceType.Default)
- if (ecmResource != null && ecmResource.getUsedResource != null) {
- usedResource = ecmResource.getUsedResource
- }
+ if (ecmLabel != null & resource != null) {
logString ++= ("ecm current resource:")
logString ++= (ecmLabel.getServiceInstance.getInstance)
- logString ++= (usedResource.toJson + " ")
+ logString ++= (resource.toJson + " ")
}
logString.toString()
}
- def failed(changeType: String, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null, exception: Exception = null): Unit = {
+ def failed(changeType: String, resource: Resource, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null, exception: Exception = null): Unit = Utils.tryQuietly {
if (changeType != null) {
val log: String = changeType match {
case ChangeType.ENGINE_INIT => {
- printLog(changeType, ChangeType.FAILED, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
}
case ChangeType.ENGINE_CLEAR => {
- printLog(changeType, ChangeType.FAILED, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
}
case ChangeType.ECM_INIT => {
- printLog(changeType, ChangeType.FAILED, null, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, null, ecmLabel)
}
case ChangeType.ECM_CLEAR => {
- printLog(changeType, ChangeType.FAILED, null, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, null, ecmLabel)
}
case ChangeType.ECM_RESOURCE_ADD => {
- printLog(changeType, ChangeType.FAILED, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
}
case ChangeType.ECM_Resource_MINUS => {
- printLog(changeType, ChangeType.FAILED, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
}
case _ => " "
}
if (exception != null) {
- error(log, exception)
+ logger.error(log, exception)
} else {
- error(log)
+ logger.error(log)
}
}
}
- def success(changeType: String, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null): Unit = {
+
+ def success(changeType: String, resource: Resource, engineLabel: EngineInstanceLabel = null, ecmLabel: EMInstanceLabel = null): Unit = Utils.tryQuietly {
if (changeType != null) {
val log: String = changeType match {
case ChangeType.ENGINE_INIT => {
- printLog(changeType, ChangeType.SUCCESS, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
}
case ChangeType.ENGINE_CLEAR => {
- printLog(changeType, ChangeType.SUCCESS, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
}
case ChangeType.ECM_INIT => {
- printLog(changeType, ChangeType.SUCCESS, null, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, null, ecmLabel)
}
case ChangeType.ECM_CLEAR => {
- printLog(changeType, ChangeType.SUCCESS, null, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, null, ecmLabel)
}
case ChangeType.ECM_RESOURCE_ADD => {
- printLog(changeType, ChangeType.SUCCESS, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
}
case ChangeType.ECM_Resource_MINUS => {
- printLog(changeType, ChangeType.SUCCESS, engineLabel, ecmLabel)
+ printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
}
case _ => " "
}
- info(log)
+ logger.info(log)
}
}
@@ -135,80 +126,57 @@ case class ResourceLogService() extends Logging{
def printNode(nodeLabel: EngineInstanceLabel, source: CombinedLabel): Unit = {
val log = s"${nodeLabel.getInstance()}\t${source.getStringValue}"
- info(log)
+ logger.info(log)
}
- def deserialize(persistenceResourceActionRecord: PersistenceResourceActionRecord): ResourceActionRecord = {
- val resourceActionRecord = new ResourceActionRecord
- resourceActionRecord.setId(persistenceResourceActionRecord.getId)
- resourceActionRecord.setLabelValue(persistenceResourceActionRecord.getLabelValue)
- resourceActionRecord.setTicketId(persistenceResourceActionRecord.getTicketId)
- resourceActionRecord.setRequestTimes(persistenceResourceActionRecord.getRequestTimes)
- resourceActionRecord.setRequestResourceAll(ResourceUtils.deserializeResource(persistenceResourceActionRecord.getRequestResourceAll))
- resourceActionRecord.setUsedTimes(persistenceResourceActionRecord.getUsedTimes)
- resourceActionRecord.setUsedResourceAll(ResourceUtils.deserializeResource(persistenceResourceActionRecord.getUsedResourceAll))
- resourceActionRecord.setReleaseTimes(persistenceResourceActionRecord.getReleaseTimes)
- resourceActionRecord.setReleaseResourceAll(ResourceUtils.deserializeResource(persistenceResourceActionRecord.getReleaseResourceAll))
- resourceActionRecord.setCreateTime(persistenceResourceActionRecord.getCreateTime)
- resourceActionRecord.setUpdateTime(persistenceResourceActionRecord.getUpdateTime)
- resourceActionRecord
- }
-
-
- def serialize(resourceActionRecord: ResourceActionRecord): PersistenceResourceActionRecord = {
- val persistenceResourceActionRecord = new PersistenceResourceActionRecord
- persistenceResourceActionRecord.setId(resourceActionRecord.getId)
- persistenceResourceActionRecord.setLabelValue(resourceActionRecord.getLabelValue)
- persistenceResourceActionRecord.setTicketId(resourceActionRecord.getTicketId)
- persistenceResourceActionRecord.setRequestTimes(resourceActionRecord.getRequestTimes)
- persistenceResourceActionRecord.setRequestResourceAll(ResourceUtils.serializeResource(resourceActionRecord.getRequestResourceAll))
- persistenceResourceActionRecord.setUsedTimes(resourceActionRecord.getUsedTimes)
- persistenceResourceActionRecord.setUsedResourceAll(ResourceUtils.serializeResource(resourceActionRecord.getUsedResourceAll))
- persistenceResourceActionRecord.setReleaseTimes(resourceActionRecord.getReleaseTimes)
- persistenceResourceActionRecord.setReleaseResourceAll(ResourceUtils.serializeResource(resourceActionRecord.getReleaseResourceAll))
- persistenceResourceActionRecord.setCreateTime(resourceActionRecord.getCreateTime)
- persistenceResourceActionRecord.setUpdateTime(resourceActionRecord.getUpdateTime)
- persistenceResourceActionRecord
- }
-
- def recordUserResourceAction(userCreatorEngineType: CombinedLabel, ticketId: String, changeType: String, resource: Resource): Unit = {
- if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue) {
- var persistenceResourceActionRecord = resourceManagerPersistence.getResourceActionRecord(ticketId)
- if(persistenceResourceActionRecord == null) {
- persistenceResourceActionRecord = new PersistenceResourceActionRecord(userCreatorEngineType.getStringValue, ticketId, resource)
- Utils.tryQuietly(resourceManagerPersistence.insertResourceActionRecord(persistenceResourceActionRecord))
- }
- val resourceActionRecord = deserialize(persistenceResourceActionRecord)
- changeType match {
- case ChangeType.ENGINE_REQUEST => {
- resourceActionRecord.setRequestTimes(resourceActionRecord.getRequestTimes + 1)
- resourceActionRecord.setRequestResourceAll(resourceActionRecord.getRequestResourceAll + resource)
- }
- case ChangeType.ENGINE_INIT => {
- resourceActionRecord.setUsedTimes(resourceActionRecord.getUsedTimes + 1)
- resourceActionRecord.setUsedResourceAll(resourceActionRecord.getUsedResourceAll + resource)
- }
- case ChangeType.ENGINE_CLEAR => {
- resourceActionRecord.setReleaseTimes(resourceActionRecord.getReleaseTimes + 1)
- resourceActionRecord.setReleaseResourceAll(resourceActionRecord.getReleaseResourceAll + resource)
- }
- }
- Utils.tryCatch(resourceManagerPersistence.updateResourceActionRecord(serialize(resourceActionRecord))) {
- case exception: Exception => {
- warn(s"ResourceActionRecord failed, ${userCreatorEngineType.getStringValue} with ticketId ${ticketId} after ${changeType}, ${resource}", exception)
- }
- }
+ def recordUserResourceAction(labelContainer: RMLabelContainer, ticketId: String, changeType: String, resource: Resource): Unit = if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue) Utils.tryAndWarn {
+ val userCreatorEngineType: CombinedLabel = labelContainer.getCombinedUserCreatorEngineTypeLabel
+ val engineInstanceLabel: EngineInstanceLabel = labelContainer.getEngineInstanceLabel
+ val eMInstanceLabel = labelContainer.getEMInstanceLabel
+ if (null == userCreatorEngineType) return
+ var ecResourceInfoRecord = ecResourceRecordMapper.getECResourceInfoRecord(ticketId)
+ if (ecResourceInfoRecord == null) {
+ val logDirSuffix = getECLogDirSuffix(labelContainer, ticketId)
+ val user = if (null != labelContainer.getUserCreatorLabel) labelContainer.getUserCreatorLabel.getUser else ""
+ ecResourceInfoRecord = new ECResourceInfoRecord(userCreatorEngineType.getStringValue, user, ticketId, resource, logDirSuffix)
+ ecResourceRecordMapper.insertECResourceInfoRecord(ecResourceInfoRecord)
+ }
+ if (null != engineInstanceLabel) {
+ ecResourceInfoRecord.setServiceInstance(engineInstanceLabel.getInstance)
}
+ if (null != eMInstanceLabel) {
+ ecResourceInfoRecord.setEcmInstance(eMInstanceLabel.getInstance())
+ }
+ changeType match {
+ case ChangeType.ENGINE_REQUEST =>
+ ecResourceInfoRecord.setRequestTimes(ecResourceInfoRecord.getRequestTimes + 1)
+ if (null != resource) {
+ ecResourceInfoRecord.setRequestResource(resource.toJson)
+ }
+ case ChangeType.ENGINE_INIT =>
+ ecResourceInfoRecord.setUsedTimes(ecResourceInfoRecord.getUsedTimes + 1)
+ if (null != resource) {
+ ecResourceInfoRecord.setUsedResource(resource.toJson)
+ }
+ ecResourceInfoRecord.setUsedTime(new Date(System.currentTimeMillis));
+ case ChangeType.ENGINE_CLEAR =>
+ ecResourceInfoRecord.setReleaseTimes(ecResourceInfoRecord.getReleaseTimes + 1)
+ if (null != resource) {
+ ecResourceInfoRecord.setReleasedResource(resource.toJson)
+ }
+ ecResourceInfoRecord.setReleaseTime(new Date(System.currentTimeMillis))
+ }
+ ecResourceRecordMapper.updateECResourceInfoRecord(ecResourceInfoRecord)
}
- def recordUserResourceAction(userCreatorEngineType: CombinedLabel, engineInstanceLabel: EngineInstanceLabel, changeType: String, resource: Resource): Unit = {
- if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue) {
- val instanceResource = labelResourceService.getPersistenceResource(engineInstanceLabel)
- if(instanceResource == null) {
- throw new ResourceWarnException(11005, s"${engineInstanceLabel} resource is null, resource action will not be record")
- }
- recordUserResourceAction(userCreatorEngineType, instanceResource.getTicketId, changeType, resource)
+ def getECLogDirSuffix(labelContainer: RMLabelContainer, ticketId: String): String = {
+ val engineTypeLabel = labelContainer.getEngineTypeLabel
+ val userCreatorLabel = labelContainer.getUserCreatorLabel
+ if (null == engineTypeLabel || null == userCreatorLabel) {
+ return ""
}
+ val suffix = ECPathUtils.getECWOrkDirPathSuffix(userCreatorLabel.getUser, ticketId, engineTypeLabel.getEngineType)
+ suffix + File.separator + "logs"
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/utils/RMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/utils/RMUtils.scala
index ecf8de66f..c191a3bd6 100644
--- a/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/utils/RMUtils.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-resource-manager/src/main/scala/org/apache/linkis/resourcemanager/utils/RMUtils.scala
@@ -58,7 +58,7 @@ object RMUtils extends Logging {
val AM_SERVICE_NAME = "linkis-cg-linkismanager"
- val RM_RESOURCE_ACTION_RECORD = CommonVars("wds.linkis.manager.rm.resource.action.record", false)
+ val RM_RESOURCE_ACTION_RECORD = CommonVars("wds.linkis.manager.rm.resource.action.record", true)
diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
index 49b082c29..cb69cc584 100644
--- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
+++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala
@@ -78,4 +78,6 @@ object GatewayConfiguration {
val THIS_GATEWAY_URL = CommonVars("wds.linkis.gateway.this.url", "")
val THIS_GATEWAY_SCHEMA = CommonVars("wds.linkis.gateway.this.schema", "")
+
+ val ENABLE_WATER_MARK = CommonVars("wds.linkis.web.enable.water.mark", true)
}
diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala
index 486ebea03..6bf440b4c 100644
--- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala
+++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/security/UserRestful.scala
@@ -92,7 +92,7 @@ abstract class AbstractUserRestful extends UserRestful with Logging {
def login(gatewayContext: GatewayContext): Message = {
val message = tryLogin(gatewayContext)
- message.data("sessionTimeOut", SSOUtils.getSessionTimeOut())
+ message.data("sessionTimeOut", SSOUtils.getSessionTimeOut()).data("enableWatermark", GatewayConfiguration.ENABLE_WATER_MARK.getValue)
if (securityHooks != null) securityHooks.foreach(_.postLogin(gatewayContext))
message
}
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index f22dd1be4..4786fe21a 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -109,7 +109,7 @@ This file is divided into 3 sections:
<!-- Cyclomatic complexity of a method-->
<check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
- <parameter name="maximum">20</parameter>
+ <parameter name="maximum">25</parameter>
</parameters>
</check>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org