You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/18 15:17:16 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement-3369][api] Introduce resources,
scheduler and taskinstance service interface for clear code (#4766)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9ae29a7 [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)
9ae29a7 is described below
commit 9ae29a756f0aeed894c80f5e495d786ccf03f41f
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Feb 18 23:17:01 2021 +0800
[Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)
* [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code
---
.../api/service/ResourcesService.java | 1200 +-------------------
.../api/service/SchedulerService.java | 509 +--------
.../api/service/TaskInstanceService.java | 154 +--
.../api/service/impl/AccessTokenServiceImpl.java | 10 +-
.../api/service/impl/DataAnalysisServiceImpl.java | 6 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 19 +-
.../ResourcesServiceImpl.java} | 557 +++++----
.../SchedulerServiceImpl.java} | 48 +-
.../TaskInstanceServiceImpl.java} | 17 +-
.../api/service/impl/TenantServiceImpl.java | 12 +-
.../api/service/impl/UsersServiceImpl.java | 25 +-
.../dolphinscheduler/api/utils/RegexUtils.java | 9 +
.../api/service/ResourcesServiceTest.java | 3 +-
.../api/service/SchedulerServiceTest.java | 3 +-
.../api/service/TaskInstanceServiceTest.java | 3 +-
.../dolphinscheduler/common/utils/StringUtils.java | 8 +-
16 files changed, 423 insertions(+), 2160 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index e7d8906..bb778dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -14,68 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.api.service;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.commons.collections.BeanMap;
-import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
-import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
-import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
-import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.exceptions.ServiceException;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
-import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.dao.DuplicateKeyException;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
+import org.apache.dolphinscheduler.dao.entity.User;
import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.stream.Collectors;
+import java.util.Map;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.springframework.web.multipart.MultipartFile;
/**
* resources service
*/
-@Service
-public class ResourcesService extends BaseService {
-
- private static final Logger logger = LoggerFactory.getLogger(ResourcesService.class);
-
- @Autowired
- private ResourceMapper resourcesMapper;
-
- @Autowired
- private UdfFuncMapper udfFunctionMapper;
-
- @Autowired
- private TenantMapper tenantMapper;
-
- @Autowired
- private UserMapper userMapper;
-
- @Autowired
- private ResourceUserMapper resourceUserMapper;
-
- @Autowired
- private ProcessDefinitionMapper processDefinitionMapper;
+public interface ResourcesService {
/**
* create directory
@@ -88,74 +43,12 @@ public class ResourcesService extends BaseService {
* @param currentDir current directory
* @return create directory result
*/
- @Transactional(rollbackFor = Exception.class)
- public Result createDirectory(User loginUser,
- String name,
- String description,
- ResourceType type,
- int pid,
- String currentDir) {
- Result result = new Result();
- // if hdfs not startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
- String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
- result = verifyResourceName(fullName,type,loginUser);
- if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- return result;
- }
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
-
-
- if (checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource directory {} has exist, can't recreate", fullName);
- putMsg(result, Status.RESOURCE_EXIST);
- return result;
- }
-
- Date now = new Date();
-
- Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now);
-
- try {
- resourcesMapper.insert(resource);
-
- putMsg(result, Status.SUCCESS);
- Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<String, Object>();
- for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
- if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
- resultMap.put(entry.getKey().toString(), entry.getValue());
- }
- }
- result.setData(resultMap);
- } catch (DuplicateKeyException e) {
- logger.error("resource directory {} has exist, can't recreate", fullName);
- putMsg(result, Status.RESOURCE_EXIST);
- return result;
- } catch (Exception e) {
- logger.error("resource already exists, can't recreate ", e);
- throw new RuntimeException("resource already exists, can't recreate");
- }
- //create directory in hdfs
- createDirecotry(loginUser,fullName,type,result);
- return result;
- }
+ Result<Object> createDirectory(User loginUser,
+ String name,
+ String description,
+ ResourceType type,
+ int pid,
+ String currentDir);
/**
* create resource
@@ -169,121 +62,13 @@ public class ResourcesService extends BaseService {
* @param currentDir current directory
* @return create result code
*/
- @Transactional(rollbackFor = Exception.class)
- public Result createResource(User loginUser,
- String name,
- String desc,
- ResourceType type,
- MultipartFile file,
- int pid,
- String currentDir) {
- Result result = new Result();
-
- // if hdfs not startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
-
- // file is empty
- if (file.isEmpty()) {
- logger.error("file is empty: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
- return result;
- }
-
- // file suffix
- String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
- String nameSuffix = FileUtils.suffix(name);
-
- // determine file suffix
- if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
- /**
- * rename file suffix and original suffix must be consistent
- */
- logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
- return result;
- }
-
- //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
- if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
- logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
- putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
- return result;
- }
- if (file.getSize() > Constants.MAX_FILE_SIZE) {
- logger.error("file size is too large: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
- return result;
- }
-
- // check resoure name exists
- String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
- if (checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource {} has exist, can't recreate", name);
- putMsg(result, Status.RESOURCE_EXIST);
- return result;
- }
-
- Date now = new Date();
- Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now);
-
- try {
- resourcesMapper.insert(resource);
-
- putMsg(result, Status.SUCCESS);
- Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<>();
- for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
- if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
- resultMap.put(entry.getKey().toString(), entry.getValue());
- }
- }
- result.setData(resultMap);
- } catch (Exception e) {
- logger.error("resource already exists, can't recreate ", e);
- throw new RuntimeException("resource already exists, can't recreate");
- }
-
- // fail upload
- if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
- putMsg(result, Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
- }
- return result;
- }
-
- /**
- * check resource is exists
- *
- * @param fullName fullName
- * @param userId user id
- * @param type type
- * @return true if resource exists
- */
- private boolean checkResourceExists(String fullName, int userId, int type ){
-
- List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
- return resources != null && resources.size() > 0;
- }
-
+ Result<Object> createResource(User loginUser,
+ String name,
+ String desc,
+ ResourceType type,
+ MultipartFile file,
+ int pid,
+ String currentDir);
/**
* update resource
@@ -295,239 +80,12 @@ public class ResourcesService extends BaseService {
* @param file resource file
* @return update result code
*/
- @Transactional(rollbackFor = Exception.class)
- public Result updateResource(User loginUser,
- int resourceId,
- String name,
- String desc,
- ResourceType type,
- MultipartFile file) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- Resource resource = resourcesMapper.selectById(resourceId);
- if (resource == null) {
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- if (!hasPerm(loginUser, resource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) {
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
- //check resource aleady exists
- String originFullName = resource.getFullName();
- String originResourceName = resource.getAlias();
-
- String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
- if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource {} already exists, can't recreate", name);
- putMsg(result, Status.RESOURCE_EXIST);
- return result;
- }
-
- if (file != null) {
-
- // file is empty
- if (file.isEmpty()) {
- logger.error("file is empty: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
- return result;
- }
-
- // file suffix
- String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
- String nameSuffix = FileUtils.suffix(name);
-
- // determine file suffix
- if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
- /**
- * rename file suffix and original suffix must be consistent
- */
- logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
- return result;
- }
-
- //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
- if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
- logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
- putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
- return result;
- }
- if (file.getSize() > Constants.MAX_FILE_SIZE) {
- logger.error("file size is too large: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
- return result;
- }
- }
-
- // query tenant by user id
- String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
- return result;
- }
- // verify whether the resource exists in storage
- // get the path of origin file in storage
- String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName);
- try {
- if (!HadoopUtils.getInstance().exists(originHdfsFileName)) {
- logger.error("{} not exist", originHdfsFileName);
- putMsg(result,Status.RESOURCE_NOT_EXIST);
- return result;
- }
- } catch (IOException e) {
- logger.error(e.getMessage(),e);
- throw new ServiceException(Status.HDFS_OPERATION_ERROR);
- }
-
- if (!resource.isDirectory()) {
- //get the origin file suffix
- String originSuffix = FileUtils.suffix(originFullName);
- String suffix = FileUtils.suffix(fullName);
- boolean suffixIsChanged = false;
- if (StringUtils.isBlank(suffix) && StringUtils.isNotBlank(originSuffix)) {
- suffixIsChanged = true;
- }
- if (StringUtils.isNotBlank(suffix) && !suffix.equals(originSuffix)) {
- suffixIsChanged = true;
- }
- //verify whether suffix is changed
- if (suffixIsChanged) {
- //need verify whether this resource is authorized to other users
- Map<String, Object> columnMap = new HashMap<>();
- columnMap.put("resources_id", resourceId);
-
- List<ResourcesUser> resourcesUsers = resourceUserMapper.selectByMap(columnMap);
- if (CollectionUtils.isNotEmpty(resourcesUsers)) {
- List<Integer> userIds = resourcesUsers.stream().map(ResourcesUser::getUserId).collect(Collectors.toList());
- List<User> users = userMapper.selectBatchIds(userIds);
- String userNames = users.stream().map(User::getUserName).collect(Collectors.toList()).toString();
- logger.error("resource is authorized to user {},suffix not allowed to be modified", userNames);
- putMsg(result,Status.RESOURCE_IS_AUTHORIZED,userNames);
- return result;
- }
- }
- }
-
- // updateResource data
- Date now = new Date();
-
- resource.setAlias(name);
- resource.setFullName(fullName);
- resource.setDescription(desc);
- resource.setUpdateTime(now);
- if (file != null) {
- resource.setFileName(file.getOriginalFilename());
- resource.setSize(file.getSize());
- }
-
- try {
- resourcesMapper.updateById(resource);
- if (resource.isDirectory()) {
- List<Integer> childrenResource = listAllChildren(resource,false);
- if (CollectionUtils.isNotEmpty(childrenResource)) {
- String matcherFullName = Matcher.quoteReplacement(fullName);
- List<Resource> childResourceList = new ArrayList<>();
- Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
- List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
- childResourceList = resourceList.stream().map(t -> {
- t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
- t.setUpdateTime(now);
- return t;
- }).collect(Collectors.toList());
- resourcesMapper.batchUpdateResource(childResourceList);
-
- if (ResourceType.UDF.equals(resource.getType())) {
- List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray);
- if (CollectionUtils.isNotEmpty(udfFuncs)) {
- udfFuncs = udfFuncs.stream().map(t -> {
- t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName));
- t.setUpdateTime(now);
- return t;
- }).collect(Collectors.toList());
- udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
- }
- }
- }
- } else if (ResourceType.UDF.equals(resource.getType())) {
- List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId});
- if (CollectionUtils.isNotEmpty(udfFuncs)) {
- udfFuncs = udfFuncs.stream().map(t -> {
- t.setResourceName(fullName);
- t.setUpdateTime(now);
- return t;
- }).collect(Collectors.toList());
- udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
- }
-
- }
-
- putMsg(result, Status.SUCCESS);
- Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<>(5);
- for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
- if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
- resultMap.put(entry.getKey().toString(), entry.getValue());
- }
- }
- result.setData(resultMap);
- } catch (Exception e) {
- logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
- throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
- }
-
- // if name unchanged, return directly without moving on HDFS
- if (originResourceName.equals(name) && file == null) {
- return result;
- }
-
- if (file != null) {
- // fail upload
- if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
- putMsg(result, Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
- }
- if (!fullName.equals(originFullName)) {
- try {
- HadoopUtils.getInstance().delete(originHdfsFileName,false);
- } catch (IOException e) {
- logger.error(e.getMessage(),e);
- throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
- }
- }
- return result;
- }
-
-
- // get the path of dest file in hdfs
- String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
-
-
- try {
- logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
- HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true);
- } catch (Exception e) {
- logger.error(MessageFormat.format("hdfs copy {0} -> {1} fail", originHdfsFileName, destHdfsFileName), e);
- putMsg(result,Status.HDFS_COPY_FAIL);
- throw new ServiceException(Status.HDFS_COPY_FAIL);
- }
-
- return result;
-
- }
+ Result<Object> updateResource(User loginUser,
+ int resourceId,
+ String name,
+ String desc,
+ ResourceType type,
+ MultipartFile file);
/**
* query resources list paging
@@ -539,99 +97,7 @@ public class ResourcesService extends BaseService {
* @param pageSize page size
* @return resource list page
*/
- public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
-
- HashMap<String, Object> result = new HashMap<>(5);
- Page<Resource> page = new Page(pageNo, pageSize);
- int userId = loginUser.getId();
- if (isAdmin(loginUser)) {
- userId= 0;
- }
- if (direcotryId != -1) {
- Resource directory = resourcesMapper.selectById(direcotryId);
- if (directory == null) {
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- }
-
- IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
- userId,direcotryId, type.ordinal(), searchVal);
- PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
- pageInfo.setTotalCount((int)resourceIPage.getTotal());
- pageInfo.setLists(resourceIPage.getRecords());
- result.put(Constants.DATA_LIST, pageInfo);
- putMsg(result,Status.SUCCESS);
- return result;
- }
-
- /**
- * create direcoty
- * @param loginUser login user
- * @param fullName full name
- * @param type resource type
- * @param result Result
- */
- private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
- // query tenant
- String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
- String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
- String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
- try {
- if (!HadoopUtils.getInstance().exists(resourceRootPath)) {
- createTenantDirIfNotExists(tenantCode);
- }
-
- if (!HadoopUtils.getInstance().mkdir(directoryName)) {
- logger.error("create resource directory {} of hdfs failed",directoryName);
- putMsg(result,Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
- }
- } catch (Exception e) {
- logger.error("create resource directory {} of hdfs failed",directoryName);
- putMsg(result,Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
- }
- }
-
- /**
- * upload file to hdfs
- *
- * @param loginUser login user
- * @param fullName full name
- * @param file file
- */
- private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) {
- // save to local
- String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
- String nameSuffix = FileUtils.suffix(fullName);
-
- // determine file suffix
- if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
- return false;
- }
- // query tenant
- String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
- // random file name
- String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
-
-
- // save file to hdfs, and delete original file
- String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
- String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
- try {
- // if tenant dir not exists
- if (!HadoopUtils.getInstance().exists(resourcePath)) {
- createTenantDirIfNotExists(tenantCode);
- }
- org.apache.dolphinscheduler.api.utils.FileUtils.copyFile(file, localFilename);
- HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- return false;
- }
- return true;
- }
+ Map<String, Object> queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize);
/**
* query resource list
@@ -640,21 +106,7 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return resource list
*/
- public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
-
- Map<String, Object> result = new HashMap<>(5);
-
- int userId = loginUser.getId();
- if(isAdmin(loginUser)){
- userId = 0;
- }
- List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
- Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList);
- result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
- putMsg(result,Status.SUCCESS);
-
- return result;
- }
+ Map<String, Object> queryResourceList(User loginUser, ResourceType type);
/**
* query resource list by program type
@@ -663,33 +115,7 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return resource list
*/
- public Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
-
- Map<String, Object> result = new HashMap<>(5);
- String suffix = ".jar";
- int userId = loginUser.getId();
- if(isAdmin(loginUser)){
- userId = 0;
- }
- if (programType != null) {
- switch (programType) {
- case JAVA:
- break;
- case SCALA:
- break;
- case PYTHON:
- suffix = ".py";
- break;
- }
- }
- List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
- List<Resource> resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter();
- Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
- result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
- putMsg(result,Status.SUCCESS);
-
- return result;
- }
+ Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType);
/**
* delete resource
@@ -697,82 +123,9 @@ public class ResourcesService extends BaseService {
* @param loginUser login user
* @param resourceId resource id
* @return delete result code
- * @throws Exception exception
+ * @throws IOException exception
*/
- @Transactional(rollbackFor = Exception.class)
- public Result delete(User loginUser, int resourceId) throws Exception {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- //get resource and hdfs path
- Resource resource = resourcesMapper.selectById(resourceId);
- if (resource == null) {
- logger.error("resource file not exist, resource id {}", resourceId);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- if (!hasPerm(loginUser, resource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
- return result;
- }
-
- // get all resource id of process definitions those is released
- List<Map<String, Object>> list = processDefinitionMapper.listResources();
- Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
- Set<Integer> resourceIdSet = resourceProcessMap.keySet();
- // get all children of the resource
- List<Integer> allChildren = listAllChildren(resource,true);
- Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);
-
- //if resource type is UDF,need check whether it is bound by UDF functon
- if (resource.getType() == (ResourceType.UDF)) {
- List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
- if (CollectionUtils.isNotEmpty(udfFuncs)) {
- logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
- putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
- return result;
- }
- }
-
- if (resourceIdSet.contains(resource.getPid())) {
- logger.error("can't be deleted,because it is used of process definition");
- putMsg(result, Status.RESOURCE_IS_USED);
- return result;
- }
- resourceIdSet.retainAll(allChildren);
- if (CollectionUtils.isNotEmpty(resourceIdSet)) {
- logger.error("can't be deleted,because it is used of process definition");
- for (Integer resId : resourceIdSet) {
- logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId));
- }
- putMsg(result, Status.RESOURCE_IS_USED);
- return result;
- }
-
- // get hdfs file by type
- String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
-
- //delete data in database
- resourcesMapper.deleteIds(needDeleteResourceIdArray);
- resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray);
-
- //delete file on hdfs
- HadoopUtils.getInstance().delete(hdfsFilename, true);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
+ Result<Object> delete(User loginUser, int resourceId) throws IOException;
/**
* verify resource by name and type
@@ -781,37 +134,7 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
- public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
- Result result = new Result();
- putMsg(result, Status.SUCCESS);
- if (checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
- putMsg(result, Status.RESOURCE_EXIST);
- } else {
- // query tenant
- Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
- if(tenant != null){
- String tenantCode = tenant.getTenantCode();
-
- try {
- String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
- if(HadoopUtils.getInstance().exists(hdfsFilename)){
- logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
- putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
- }
-
- } catch (Exception e) {
- logger.error(e.getMessage(),e);
- putMsg(result,Status.HDFS_OPERATION_ERROR);
- }
- }else{
- putMsg(result,Status.TENANT_NOT_EXIST);
- }
- }
-
-
- return result;
- }
+ Result<Object> verifyResourceName(String fullName, ResourceType type,User loginUser);
/**
* verify resource by full name or pid and type
@@ -820,40 +143,7 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return true if the resource full name or pid not exists, otherwise return false
*/
- public Result queryResource(String fullName,Integer id,ResourceType type) {
- Result result = new Result();
- if (StringUtils.isBlank(fullName) && id == null) {
- logger.error("You must input one of fullName and pid");
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
- return result;
- }
- if (StringUtils.isNotBlank(fullName)) {
- List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
- if (CollectionUtils.isEmpty(resourceList)) {
- logger.error("resource file not exist, resource full name {} ", fullName);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- putMsg(result, Status.SUCCESS);
- result.setData(resourceList.get(0));
- } else {
- Resource resource = resourcesMapper.selectById(id);
- if (resource == null) {
- logger.error("resource file not exist, resource id {}", id);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- Resource parentResource = resourcesMapper.selectById(resource.getPid());
- if (parentResource == null) {
- logger.error("parent resource file not exist, resource id {}", id);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- putMsg(result, Status.SUCCESS);
- result.setData(parentResource);
- }
- return result;
- }
+ Result<Object> queryResource(String fullName,Integer id,ResourceType type);
/**
* view resource file online
@@ -863,64 +153,7 @@ public class ResourcesService extends BaseService {
* @param limit limit
* @return resource content
*/
- public Result readResource(int resourceId, int skipLineNum, int limit) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- // get resource by id
- Resource resource = resourcesMapper.selectById(resourceId);
- if (resource == null) {
- logger.error("resource file not exist, resource id {}", resourceId);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- //check preview or not by file suffix
- String nameSuffix = FileUtils.suffix(resource.getAlias());
- String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
- if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
- List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
- if (!strList.contains(nameSuffix)) {
- logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId);
- putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
- return result;
- }
- }
-
- String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
- return result;
- }
-
- // hdfs path
- String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
- logger.info("resource hdfs path is {} ", hdfsFileName);
- try {
- if(HadoopUtils.getInstance().exists(hdfsFileName)){
- List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
-
- putMsg(result, Status.SUCCESS);
- Map<String, Object> map = new HashMap<>();
- map.put(ALIAS, resource.getAlias());
- map.put(CONTENT, String.join("\n", content));
- result.setData(map);
- }else{
- logger.error("read file {} not exist in hdfs", hdfsFileName);
- putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
- }
-
- } catch (Exception e) {
- logger.error("Resource {} read failed", hdfsFileName, e);
- putMsg(result, Status.HDFS_OPERATION_ERROR);
- }
-
- return result;
- }
+ Result<Object> readResource(int resourceId, int skipLineNum, int limit);
/**
* create resource file online
@@ -933,73 +166,7 @@ public class ResourcesService extends BaseService {
* @param content content
* @return create result code
*/
- @Transactional(rollbackFor = Exception.class)
- public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
- Result result = new Result();
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- //check file suffix
- String nameSuffix = fileSuffix.trim();
- String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
- if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
- List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
- if (!strList.contains(nameSuffix)) {
- logger.error("resouce suffix {} not support create", nameSuffix);
- putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
- return result;
- }
- }
-
- String name = fileName.trim() + "." + nameSuffix;
- String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
-
- result = verifyResourceName(fullName,type,loginUser);
- if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- return result;
- }
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
-
- // save data
- Date now = new Date();
- Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
-
- resourcesMapper.insert(resource);
-
- putMsg(result, Status.SUCCESS);
- Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<>();
- for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
- if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
- resultMap.put(entry.getKey().toString(), entry.getValue());
- }
- }
- result.setData(resultMap);
-
- String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
-
- result = uploadContentToHdfs(fullName, tenantCode, content);
- if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- throw new RuntimeException(result.getMsg());
- }
- return result;
- }
+ Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory);
/**
* updateProcessInstance resource
@@ -1008,145 +175,16 @@ public class ResourcesService extends BaseService {
* @param content content
* @return update result cod
*/
- @Transactional(rollbackFor = Exception.class)
- public Result updateResourceContent(int resourceId, String content) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- Resource resource = resourcesMapper.selectById(resourceId);
- if (resource == null) {
- logger.error("read file not exist, resource id {}", resourceId);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- //check can edit by file suffix
- String nameSuffix = FileUtils.suffix(resource.getAlias());
- String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
- if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
- List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
- if (!strList.contains(nameSuffix)) {
- logger.error("resource suffix {} not support updateProcessInstance, resource id {}", nameSuffix, resourceId);
- putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
- return result;
- }
- }
-
- String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
- return result;
- }
- resource.setSize(content.getBytes().length);
- resource.setUpdateTime(new Date());
- resourcesMapper.updateById(resource);
-
-
- result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
- if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- throw new RuntimeException(result.getMsg());
- }
- return result;
- }
-
- /**
- * @param resourceName resource name
- * @param tenantCode tenant code
- * @param content content
- * @return result
- */
- private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
- Result result = new Result();
- String localFilename = "";
- String hdfsFileName = "";
- try {
- localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
-
- if (!FileUtils.writeContent2File(content, localFilename)) {
- // write file fail
- logger.error("file {} fail, content is {}", localFilename, content);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
-
- // get resource file hdfs path
- hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
- String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
- logger.info("resource hdfs path is {} ", hdfsFileName);
-
- HadoopUtils hadoopUtils = HadoopUtils.getInstance();
- if (!hadoopUtils.exists(resourcePath)) {
- // create if tenant dir not exists
- createTenantDirIfNotExists(tenantCode);
- }
- if (hadoopUtils.exists(hdfsFileName)) {
- hadoopUtils.delete(hdfsFileName, false);
- }
-
- hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
- result.setMsg(String.format("copy %s to hdfs %s fail", localFilename, hdfsFileName));
- return result;
- }
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
+ Result<Object> updateResourceContent(int resourceId, String content);
/**
* download file
*
* @param resourceId resource id
* @return resource content
- * @throws Exception exception
+ * @throws IOException exception
*/
- public org.springframework.core.io.Resource downloadResource(int resourceId) throws Exception {
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- throw new RuntimeException("hdfs not startup");
- }
-
- Resource resource = resourcesMapper.selectById(resourceId);
- if (resource == null) {
- logger.error("download file not exist, resource id {}", resourceId);
- return null;
- }
- if (resource.isDirectory()) {
- logger.error("resource id {} is directory,can't download it", resourceId);
- throw new RuntimeException("cant't download directory");
- }
-
- int userId = resource.getUserId();
- User user = userMapper.selectById(userId);
- if(user == null){
- logger.error("user id {} not exists", userId);
- throw new RuntimeException(String.format("resource owner id %d not exist",userId));
- }
-
- Tenant tenant = tenantMapper.queryById(user.getTenantId());
- if(tenant == null){
- logger.error("tenant id {} not exists", user.getTenantId());
- throw new RuntimeException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
- }
-
- String tenantCode = tenant.getTenantCode();
-
- String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
-
- String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
- logger.info("resource hdfs path is {} ", hdfsFileName);
-
- HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true);
- return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
- }
-
+ org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException;
/**
* list all file
@@ -1155,25 +193,7 @@ public class ResourcesService extends BaseService {
* @param userId user id
* @return unauthorized result code
*/
- public Map<String, Object> authorizeResourceTree(User loginUser, Integer userId) {
-
- Map<String, Object> result = new HashMap<>();
- if (isNotAdmin(loginUser, result)) {
- return result;
- }
- List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
- List<ResourceComponent> list;
- if (CollectionUtils.isNotEmpty(resourceList)) {
- Visitor visitor = new ResourceTreeVisitor(resourceList);
- list = visitor.visit().getChildren();
- } else {
- list = new ArrayList<>(0);
- }
-
- result.put(Constants.DATA_LIST, list);
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> authorizeResourceTree(User loginUser, Integer userId);
/**
* unauthorized file
@@ -1182,28 +202,7 @@ public class ResourcesService extends BaseService {
* @param userId user id
* @return unauthorized result code
*/
- public Map<String, Object> unauthorizedFile(User loginUser, Integer userId) {
-
- Map<String, Object> result = new HashMap<>();
- if (isNotAdmin(loginUser, result)) {
- return result;
- }
- List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
- List<Resource> list;
- if (resourceList != null && resourceList.size() > 0) {
- Set<Resource> resourceSet = new HashSet<>(resourceList);
- List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
-
- getAuthorizedResourceList(resourceSet, authedResourceList);
- list = new ArrayList<>(resourceSet);
- } else {
- list = new ArrayList<>(0);
- }
- Visitor visitor = new ResourceTreeVisitor(list);
- result.put(Constants.DATA_LIST, visitor.visit().getChildren());
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> unauthorizedFile(User loginUser, Integer userId);
/**
* unauthorized udf function
@@ -1212,29 +211,7 @@ public class ResourcesService extends BaseService {
* @param userId user id
* @return unauthorized result code
*/
- public Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>(5);
- //only admin can operate
- if (isNotAdmin(loginUser, result)) {
- return result;
- }
-
- List<UdfFunc> udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId);
- List<UdfFunc> resultList = new ArrayList<>();
- Set<UdfFunc> udfFuncSet = null;
- if (CollectionUtils.isNotEmpty(udfFuncList)) {
- udfFuncSet = new HashSet<>(udfFuncList);
-
- List<UdfFunc> authedUDFFuncList = udfFunctionMapper.queryAuthedUdfFunc(userId);
-
- getAuthorizedResourceList(udfFuncSet, authedUDFFuncList);
- resultList = new ArrayList<>(udfFuncSet);
- }
- result.put(Constants.DATA_LIST, resultList);
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
+ Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId);
/**
* authorized udf function
@@ -1243,17 +220,7 @@ public class ResourcesService extends BaseService {
* @param userId user id
* @return authorized result code
*/
- public Map<String, Object> authorizedUDFFunction(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>();
- if (isNotAdmin(loginUser, result)) {
- return result;
- }
- List<UdfFunc> udfFuncs = udfFunctionMapper.queryAuthedUdfFunc(userId);
- result.put(Constants.DATA_LIST, udfFuncs);
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
+ Map<String, Object> authorizedUDFFunction(User loginUser, Integer userId);
/**
* authorized file
@@ -1262,91 +229,6 @@ public class ResourcesService extends BaseService {
* @param userId user id
* @return authorized result
*/
- public Map<String, Object> authorizedFile(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>(5);
- if (isNotAdmin(loginUser, result)) {
- return result;
- }
- List<Resource> authedResources = resourcesMapper.queryAuthorizedResourceList(userId);
- Visitor visitor = new ResourceTreeVisitor(authedResources);
- String visit = JSONUtils.toJsonString(visitor.visit(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
- logger.info(visit);
- String jsonTreeStr = JSONUtils.toJsonString(visitor.visit().getChildren(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
- logger.info(jsonTreeStr);
- result.put(Constants.DATA_LIST, visitor.visit().getChildren());
- putMsg(result,Status.SUCCESS);
- return result;
- }
-
- /**
- * get authorized resource list
- *
- * @param resourceSet resource set
- * @param authedResourceList authorized resource list
- */
- private void getAuthorizedResourceList(Set<?> resourceSet, List<?> authedResourceList) {
- Set<?> authedResourceSet = null;
- if (CollectionUtils.isNotEmpty(authedResourceList)) {
- authedResourceSet = new HashSet<>(authedResourceList);
- resourceSet.removeAll(authedResourceSet);
- }
- }
-
- /**
- * get tenantCode by UserId
- *
- * @param userId user id
- * @param result return result
- * @return
- */
- private String getTenantCode(int userId,Result result){
-
- User user = userMapper.selectById(userId);
- if (user == null) {
- logger.error("user {} not exists", userId);
- putMsg(result, Status.USER_NOT_EXIST,userId);
- return null;
- }
-
- Tenant tenant = tenantMapper.queryById(user.getTenantId());
- if (tenant == null){
- logger.error("tenant not exists");
- putMsg(result, Status.TENANT_NOT_EXIST);
- return null;
- }
- return tenant.getTenantCode();
- }
-
- /**
- * list all children id
- * @param resource resource
- * @param containSelf whether add self to children list
- * @return all children id
- */
- List<Integer> listAllChildren(Resource resource,boolean containSelf){
- List<Integer> childList = new ArrayList<>();
- if (resource.getId() != -1 && containSelf) {
- childList.add(resource.getId());
- }
-
- if(resource.isDirectory()){
- listAllChildren(resource.getId(),childList);
- }
- return childList;
- }
-
- /**
- * list all children id
- * @param resourceId resource id
- * @param childList child list
- */
- void listAllChildren(int resourceId,List<Integer> childList){
-
- List<Integer> children = resourcesMapper.listChildren(resourceId);
- for(int chlidId:children){
- childList.add(chlidId);
- listAllChildren(chlidId,childList);
- }
- }
+ Map<String, Object> authorizedFile(User loginUser, Integer userId);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index 55880ad..18f3ebf 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -17,77 +17,18 @@
package org.apache.dolphinscheduler.api.service;
-import org.apache.dolphinscheduler.api.dto.ScheduleParam;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.exceptions.ServiceException;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
-import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.quartz.CronExpression;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-
/**
* scheduler service
*/
-@Service
-public class SchedulerService extends BaseService {
-
- private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class);
-
- @Autowired
- private ProjectService projectService;
-
- @Autowired
- private ExecutorService executorService;
-
- @Autowired
- private MonitorService monitorService;
-
- @Autowired
- private ProcessService processService;
-
- @Autowired
- private ScheduleMapper scheduleMapper;
-
- @Autowired
- private ProjectMapper projectMapper;
-
- @Autowired
- private ProcessDefinitionMapper processDefinitionMapper;
+public interface SchedulerService {
/**
* save schedule
@@ -103,80 +44,14 @@ public class SchedulerService extends BaseService {
* @param workerGroup worker group
* @return create result code
*/
- @Transactional(rollbackFor = RuntimeException.class)
- public Map<String, Object> insertSchedule(User loginUser, String projectName,
- Integer processDefineId,
- String schedule,
- WarningType warningType,
- int warningGroupId,
- FailureStrategy failureStrategy,
- Priority processInstancePriority,
- String workerGroup) {
-
- Map<String, Object> result = new HashMap();
-
- Project project = projectMapper.queryByName(projectName);
-
- // check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
- if (!hasProjectAndPerm) {
- return result;
- }
-
- // check work flow define release state
- ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
- result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
-
- Schedule scheduleObj = new Schedule();
- Date now = new Date();
-
- scheduleObj.setProjectName(projectName);
- scheduleObj.setProcessDefinitionId(processDefinition.getId());
- scheduleObj.setProcessDefinitionName(processDefinition.getName());
-
- ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
- if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
- putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
- return result;
- }
- scheduleObj.setStartTime(scheduleParam.getStartTime());
- scheduleObj.setEndTime(scheduleParam.getEndTime());
- if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
- logger.error(scheduleParam.getCrontab() + " verify failure");
-
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
- return result;
- }
- scheduleObj.setCrontab(scheduleParam.getCrontab());
- scheduleObj.setWarningType(warningType);
- scheduleObj.setWarningGroupId(warningGroupId);
- scheduleObj.setFailureStrategy(failureStrategy);
- scheduleObj.setCreateTime(now);
- scheduleObj.setUpdateTime(now);
- scheduleObj.setUserId(loginUser.getId());
- scheduleObj.setUserName(loginUser.getUserName());
- scheduleObj.setReleaseState(ReleaseState.OFFLINE);
- scheduleObj.setProcessInstancePriority(processInstancePriority);
- scheduleObj.setWorkerGroup(workerGroup);
- scheduleMapper.insert(scheduleObj);
-
- /**
- * updateProcessInstance receivers and cc by process definition id
- */
- processDefinition.setWarningGroupId(warningGroupId);
- processDefinitionMapper.updateById(processDefinition);
-
- // return scheduler object with ID
- result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId()));
- putMsg(result, Status.SUCCESS);
-
- result.put("scheduleId", scheduleObj.getId());
- return result;
- }
+ Map<String, Object> insertSchedule(User loginUser, String projectName,
+ Integer processDefineId,
+ String schedule,
+ WarningType warningType,
+ int warningGroupId,
+ FailureStrategy failureStrategy,
+ Priority processInstancePriority,
+ String workerGroup);
/**
* updateProcessInstance schedule
@@ -193,95 +68,16 @@ public class SchedulerService extends BaseService {
* @param scheduleStatus schedule status
* @return update result code
*/
- @Transactional(rollbackFor = RuntimeException.class)
- public Map<String, Object> updateSchedule(User loginUser,
- String projectName,
- Integer id,
- String scheduleExpression,
- WarningType warningType,
- int warningGroupId,
- FailureStrategy failureStrategy,
- ReleaseState scheduleStatus,
- Priority processInstancePriority,
- String workerGroup) {
- Map<String, Object> result = new HashMap<String, Object>(5);
-
- Project project = projectMapper.queryByName(projectName);
-
- // check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
- if (!hasProjectAndPerm) {
- return result;
- }
-
- // check schedule exists
- Schedule schedule = scheduleMapper.selectById(id);
-
- if (schedule == null) {
- putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
- return result;
- }
-
- ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
- return result;
- }
-
- /**
- * scheduling on-line status forbid modification
- */
- if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
- return result;
- }
-
- Date now = new Date();
-
- // updateProcessInstance param
- if (StringUtils.isNotEmpty(scheduleExpression)) {
- ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
- if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
- putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
- return result;
- }
- schedule.setStartTime(scheduleParam.getStartTime());
- schedule.setEndTime(scheduleParam.getEndTime());
- if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
- putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
- return result;
- }
- schedule.setCrontab(scheduleParam.getCrontab());
- }
-
- if (warningType != null) {
- schedule.setWarningType(warningType);
- }
-
- schedule.setWarningGroupId(warningGroupId);
-
- if (failureStrategy != null) {
- schedule.setFailureStrategy(failureStrategy);
- }
-
- if (scheduleStatus != null) {
- schedule.setReleaseState(scheduleStatus);
- }
- schedule.setWorkerGroup(workerGroup);
- schedule.setUpdateTime(now);
- schedule.setProcessInstancePriority(processInstancePriority);
- scheduleMapper.updateById(schedule);
-
- /**
- * updateProcessInstance recipients and cc by process definition ID
- */
- processDefinition.setWarningGroupId(warningGroupId);
-
- processDefinitionMapper.updateById(processDefinition);
-
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> updateSchedule(User loginUser,
+ String projectName,
+ Integer id,
+ String scheduleExpression,
+ WarningType warningType,
+ int warningGroupId,
+ FailureStrategy failureStrategy,
+ ReleaseState scheduleStatus,
+ Priority processInstancePriority,
+ String workerGroup);
/**
@@ -293,110 +89,10 @@ public class SchedulerService extends BaseService {
* @param scheduleStatus schedule status
* @return publish result code
*/
- @Transactional(rollbackFor = RuntimeException.class)
- public Map<String, Object> setScheduleState(User loginUser,
- String projectName,
- Integer id,
- ReleaseState scheduleStatus) {
-
- Map<String, Object> result = new HashMap<String, Object>(5);
-
- Project project = projectMapper.queryByName(projectName);
- // check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
- if (!hasProjectAndPerm) {
- return result;
- }
-
- // check schedule exists
- Schedule scheduleObj = scheduleMapper.selectById(id);
-
- if (scheduleObj == null) {
- putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
- return result;
- }
- // check schedule release state
- if (scheduleObj.getReleaseState() == scheduleStatus) {
- logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
- scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
- putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
- return result;
- }
- ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
- return result;
- }
-
- if (scheduleStatus == ReleaseState.ONLINE) {
- // check process definition release state
- if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- logger.info("not release process definition id: {} , name : {}",
- processDefinition.getId(), processDefinition.getName());
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
- return result;
- }
- // check sub process definition release state
- List<Integer> subProcessDefineIds = new ArrayList<>();
- processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
- Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
- if (subProcessDefineIds.size() > 0) {
- List<ProcessDefinition> subProcessDefinitionList =
- processDefinitionMapper.queryDefinitionListByIdList(idArray);
- if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) {
- for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
- /**
- * if there is no online process, exit directly
- */
- if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
- logger.info("not release process definition id: {} , name : {}",
- subProcessDefinition.getId(), subProcessDefinition.getName());
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
- return result;
- }
- }
- }
- }
- }
-
- // check master server exists
- List<Server> masterServers = monitorService.getServerListFromZK(true);
-
- if (masterServers.size() == 0) {
- putMsg(result, Status.MASTER_NOT_EXISTS);
- return result;
- }
-
- // set status
- scheduleObj.setReleaseState(scheduleStatus);
-
- scheduleMapper.updateById(scheduleObj);
-
- try {
- switch (scheduleStatus) {
- case ONLINE: {
- logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
- setSchedule(project.getId(), scheduleObj);
- break;
- }
- case OFFLINE: {
- logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
- deleteSchedule(project.getId(), id);
- break;
- }
- default: {
- putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
- return result;
- }
- }
- } catch (Exception e) {
- result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
- throw new ServiceException(result.get(Constants.MSG).toString());
- }
-
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> setScheduleState(User loginUser,
+ String projectName,
+ Integer id,
+ ReleaseState scheduleStatus);
/**
* query schedule
@@ -409,36 +105,7 @@ public class SchedulerService extends BaseService {
* @param searchVal search value
* @return schedule list page
*/
- public Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) {
-
- HashMap<String, Object> result = new HashMap<>();
-
- Project project = projectMapper.queryByName(projectName);
-
- // check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
- if (!hasProjectAndPerm) {
- return result;
- }
-
- ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
- return result;
- }
- Page<Schedule> page = new Page(pageNo, pageSize);
- IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
- page, processDefineId, searchVal
- );
-
- PageInfo pageInfo = new PageInfo<Schedule>(pageNo, pageSize);
- pageInfo.setTotalCount((int) scheduleIPage.getTotal());
- pageInfo.setLists(scheduleIPage.getRecords());
- result.put(Constants.DATA_LIST, pageInfo);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
+ Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize);
/**
* query schedule list
@@ -447,41 +114,7 @@ public class SchedulerService extends BaseService {
* @param projectName project name
* @return schedule list
*/
- public Map<String, Object> queryScheduleList(User loginUser, String projectName) {
- Map<String, Object> result = new HashMap<>();
- Project project = projectMapper.queryByName(projectName);
-
- // check project auth
- boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
- if (!hasProjectAndPerm) {
- return result;
- }
-
- List<Schedule> schedules = scheduleMapper.querySchedulerListByProjectName(projectName);
-
- result.put(Constants.DATA_LIST, schedules);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
-
- public void setSchedule(int projectId, Schedule schedule) {
-
- int scheduleId = schedule.getId();
- logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
-
- Date startDate = schedule.getStartTime();
- Date endDate = schedule.getEndTime();
-
- String jobName = QuartzExecutors.buildJobName(scheduleId);
- String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
-
- Map<String, Object> dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule);
-
- QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,
- schedule.getCrontab(), dataMap);
-
- }
+ Map<String, Object> queryScheduleList(User loginUser, String projectName);
/**
* delete schedule
@@ -490,35 +123,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
- public static void deleteSchedule(int projectId, int scheduleId) {
- logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
-
- String jobName = QuartzExecutors.buildJobName(scheduleId);
- String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
-
- if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) {
- logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId);
- throw new ServiceException("set offline failure");
- }
-
- }
-
- /**
- * check valid
- *
- * @param result result
- * @param bool bool
- * @param status status
- * @return check result code
- */
- private boolean checkValid(Map<String, Object> result, boolean bool, Status status) {
- // timeout is valid
- if (bool) {
- putMsg(result, status);
- return true;
- }
- return false;
- }
+ void deleteSchedule(int projectId, int scheduleId);
/**
* delete schedule by id
@@ -528,46 +133,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId scheule id
* @return delete result code
*/
- public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) {
-
- Map<String, Object> result = new HashMap<>();
- Project project = projectMapper.queryByName(projectName);
-
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
- Status resultEnum = (Status) checkResult.get(Constants.STATUS);
- if (resultEnum != Status.SUCCESS) {
- return checkResult;
- }
-
- Schedule schedule = scheduleMapper.selectById(scheduleId);
-
- if (schedule == null) {
- putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
- return result;
- }
-
- // Determine if the login user is the owner of the schedule
- if (loginUser.getId() != schedule.getUserId()
- && loginUser.getUserType() != UserType.ADMIN_USER) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- // check schedule is already online
- if (schedule.getReleaseState() == ReleaseState.ONLINE) {
- putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
- return result;
- }
-
- int delete = scheduleMapper.deleteById(scheduleId);
-
- if (delete > 0) {
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
- }
- return result;
- }
+ Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId);
/**
* preview schedule
@@ -577,24 +143,5 @@ public class SchedulerService extends BaseService {
* @param schedule schedule expression
* @return the next five fire time
*/
- public Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule) {
- Map<String, Object> result = new HashMap<>();
- CronExpression cronExpression;
- ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
- Date now = new Date();
-
- Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime();
- Date endTime = scheduleParam.getEndTime();
- try {
- cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab());
- } catch (ParseException e) {
- logger.error(e.getMessage(), e);
- putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
- return result;
- }
- List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
- result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t)));
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule);
}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index 6c68202..cbbc89b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -17,57 +17,15 @@
package org.apache.dolphinscheduler.api.service;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.text.MessageFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task instance service
*/
-@Service
-public class TaskInstanceService extends BaseService {
-
- @Autowired
- ProjectMapper projectMapper;
-
- @Autowired
- ProjectService projectService;
-
- @Autowired
- ProcessService processService;
-
- @Autowired
- TaskInstanceMapper taskInstanceMapper;
-
- @Autowired
- ProcessInstanceService processInstanceService;
-
- @Autowired
- UsersService usersService;
+public interface TaskInstanceService {
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
@@ -85,65 +43,10 @@ public class TaskInstanceService extends BaseService {
* @param pageSize page size
* @return task list page
*/
- public Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
- Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
- String endDate, String searchVal, ExecutionStatus stateType, String host,
- Integer pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>();
- Project project = projectMapper.queryByName(projectName);
-
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
- Status status = (Status) checkResult.get(Constants.STATUS);
- if (status != Status.SUCCESS) {
- return checkResult;
- }
-
- int[] statusArray = null;
- if (stateType != null) {
- statusArray = new int[]{stateType.ordinal()};
- }
-
- Date start = null;
- Date end = null;
- if (StringUtils.isNotEmpty(startDate)) {
- start = DateUtils.getScheduleDate(startDate);
- if (start == null) {
- return generateInvalidParamRes(result, "startDate");
- }
- }
- if (StringUtils.isNotEmpty(endDate)) {
- end = DateUtils.getScheduleDate(endDate);
- if (end == null) {
- return generateInvalidParamRes(result, "endDate");
- }
- }
-
- Page<TaskInstance> page = new Page(pageNo, pageSize);
- PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
- int executorId = usersService.getUserIdByName(executorName);
-
- IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
- page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end
- );
- Set<String> exclusionSet = new HashSet<>();
- exclusionSet.add(Constants.CLASS);
- exclusionSet.add("taskJson");
- List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
-
- for (TaskInstance taskInstance : taskInstanceList) {
- taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime()));
- User executor = usersService.queryUser(taskInstance.getExecutorId());
- if (null != executor) {
- taskInstance.setExecutorName(executor.getUserName());
- }
- }
- pageInfo.setTotalCount((int) taskInstanceIPage.getTotal());
- pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet));
- result.put(Constants.DATA_LIST, pageInfo);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
+ Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
+ Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
+ String endDate, String searchVal, ExecutionStatus stateType, String host,
+ Integer pageNo, Integer pageSize);
/**
* change one task instance's state from failure to forced success
@@ -153,51 +56,6 @@ public class TaskInstanceService extends BaseService {
* @param taskInstanceId task instance id
* @return the result code and msg
*/
- public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
- Map<String, Object> result = new HashMap<>(5);
- Project project = projectMapper.queryByName(projectName);
-
- // check user auth
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
- Status status = (Status) checkResult.get(Constants.STATUS);
- if (status != Status.SUCCESS) {
- return checkResult;
- }
+ Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId);
- // check whether the task instance can be found
- TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
- if (task == null) {
- putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
- return result;
- }
-
- // check whether the task instance state type is failure
- if (!task.getState().typeIsFailure()) {
- putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
- return result;
- }
-
- // change the state of the task instance
- task.setState(ExecutionStatus.FORCED_SUCCESS);
- int changedNum = taskInstanceMapper.updateById(task);
- if (changedNum > 0) {
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
- }
-
- return result;
- }
-
- /***
- * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
- * @param result exist result map
- * @param params invalid params name
- * @return update result map
- */
- private Map<String, Object> generateInvalidParamRes(Map<String, Object> result, String params) {
- result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
- result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params));
- return result;
- }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
index da85621..ba0d32e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
@@ -61,7 +61,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return token list for page number and page size
*/
public Map<String, Object> queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
PageInfo<AccessToken> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<AccessToken> page = new Page<>(pageNo, pageSize);
@@ -87,7 +87,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return create result code
*/
public Map<String, Object> createToken(User loginUser, int userId, String expireTime, String token) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM);
@@ -124,7 +124,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return token string
*/
public Map<String, Object> generateToken(User loginUser, int userId, String expireTime) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
@@ -143,7 +143,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return delete result code
*/
public Map<String, Object> delAccessTokenById(User loginUser, int id) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
AccessToken accessToken = accessTokenMapper.selectById(id);
@@ -174,7 +174,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return update result code
*/
public Map<String, Object> updateToken(User loginUser, int id, int userId, String expireTime, String token) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
index 7254fc1..b84c279 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
@@ -130,7 +130,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate
, TriFunction<Date, Date, Integer[], List<ExecuteStatusCount>> instanceStateCounter) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) {
return result;
@@ -193,7 +193,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
*/
public Map<String, Object> countCommandState(User loginUser, int projectId, String startDate, String endDate) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) {
return result;
@@ -264,7 +264,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* @return queue state count data
*/
public Map<String, Object> countQueueState(User loginUser, int projectId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2a2ae78..3a92bef 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -146,6 +146,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired
private ProcessService processService;
+ @Autowired
+ private SchedulerService schedulerService;
+
/**
* create process definition
*
@@ -273,7 +276,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override
public Map<String, Object> queryProcessDefinitionList(User loginUser, String projectName) {
- HashMap<String, Object> result = new HashMap<>(5);
+ HashMap<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -399,7 +402,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
String desc,
String locations,
String connects) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -514,7 +517,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -634,7 +637,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// set status
schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule);
- SchedulerService.deleteSchedule(project.getId(), schedule.getId());
+ schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
break;
default:
@@ -823,7 +826,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
String processMetaJson = FileUtils.file2String(file);
List<ProcessMeta> processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class);
@@ -992,7 +995,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
}
//recursive sub-process parameter correction map key for old process id value for new process id
- Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
+ Map<Integer, Integer> subProcessIdMap = new HashMap<>();
List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements())
.filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
@@ -1283,7 +1286,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override
public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer projectId) {
- HashMap<String, Object> result = new HashMap<>(5);
+ HashMap<String, Object> result = new HashMap<>();
List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(projectId);
result.put(Constants.DATA_LIST, resourceList);
@@ -1494,7 +1497,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
Integer processId,
Project targetProject) throws JsonProcessingException {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
similarity index 79%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index e7d8906..bcc37cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -14,27 +14,62 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.service;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.commons.collections.BeanMap;
+package org.apache.dolphinscheduler.api.service.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.ALIAS;
+import static org.apache.dolphinscheduler.common.Constants.CONTENT;
+import static org.apache.dolphinscheduler.common.Constants.JAR;
+
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
+
+import org.apache.commons.beanutils.BeanMap;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,21 +78,17 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.stream.Collectors;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.SerializationFeature;
/**
- * resources service
+ * resources service impl
*/
@Service
-public class ResourcesService extends BaseService {
+public class ResourcesServiceImpl extends BaseService implements ResourcesService {
- private static final Logger logger = LoggerFactory.getLogger(ResourcesService.class);
+ private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceImpl.class);
@Autowired
private ResourceMapper resourcesMapper;
@@ -89,38 +120,21 @@ public class ResourcesService extends BaseService {
* @return create directory result
*/
@Transactional(rollbackFor = Exception.class)
- public Result createDirectory(User loginUser,
- String name,
- String description,
- ResourceType type,
- int pid,
- String currentDir) {
- Result result = new Result();
- // if hdfs not startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> createDirectory(User loginUser,
+ String name,
+ String description,
+ ResourceType type,
+ int pid,
+ String currentDir) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
- result = verifyResourceName(fullName,type,loginUser);
+ String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
+ result = verifyResource(loginUser, type, fullName, pid);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
-
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource directory {} has exist, can't recreate", fullName);
@@ -134,10 +148,9 @@ public class ResourcesService extends BaseService {
try {
resourcesMapper.insert(resource);
-
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<String, Object>();
+ Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
@@ -150,10 +163,10 @@ public class ResourcesService extends BaseService {
return result;
} catch (Exception e) {
logger.error("resource already exists, can't recreate ", e);
- throw new RuntimeException("resource already exists, can't recreate");
+ throw new ServiceException("resource already exists, can't recreate");
}
//create directory in hdfs
- createDirecotry(loginUser,fullName,type,result);
+ createDirectory(loginUser,fullName,type,result);
return result;
}
@@ -170,73 +183,32 @@ public class ResourcesService extends BaseService {
* @return create result code
*/
@Transactional(rollbackFor = Exception.class)
- public Result createResource(User loginUser,
- String name,
- String desc,
- ResourceType type,
- MultipartFile file,
- int pid,
- String currentDir) {
- Result result = new Result();
-
- // if hdfs not startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
- return result;
- }
-
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
-
- // file is empty
- if (file.isEmpty()) {
- logger.error("file is empty: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
+ public Result<Object> createResource(User loginUser,
+ String name,
+ String desc,
+ ResourceType type,
+ MultipartFile file,
+ int pid,
+ String currentDir) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- // file suffix
- String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
- String nameSuffix = FileUtils.suffix(name);
-
- // determine file suffix
- if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
- /**
- * rename file suffix and original suffix must be consistent
- */
- logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
+ result = verifyPid(loginUser, pid);
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
- if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
- logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
- putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
- return result;
- }
- if (file.getSize() > Constants.MAX_FILE_SIZE) {
- logger.error("file size is too large: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
+ result = verifyFile(name, type, file);
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- // check resoure name exists
- String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
+ // check resource name exists
+ String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource {} has exist, can't recreate", name);
+ logger.error("resource {} has exist, can't recreate", RegexUtils.escapeNRT(name));
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
@@ -246,7 +218,6 @@ public class ResourcesService extends BaseService {
try {
resourcesMapper.insert(resource);
-
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<>();
@@ -258,14 +229,14 @@ public class ResourcesService extends BaseService {
result.setData(resultMap);
} catch (Exception e) {
logger.error("resource already exists, can't recreate ", e);
- throw new RuntimeException("resource already exists, can't recreate");
+ throw new ServiceException("resource already exists, can't recreate");
}
// fail upload
if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
+ logger.error("upload resource: {} file: {} failed.", RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
+ throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
}
return result;
}
@@ -278,13 +249,11 @@ public class ResourcesService extends BaseService {
* @param type type
* @return true if resource exists
*/
- private boolean checkResourceExists(String fullName, int userId, int type ){
-
+ private boolean checkResourceExists(String fullName, int userId, int type) {
List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
- return resources != null && resources.size() > 0;
+ return resources != null && !resources.isEmpty();
}
-
/**
* update resource
* @param loginUser login user
@@ -296,18 +265,14 @@ public class ResourcesService extends BaseService {
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
- public Result updateResource(User loginUser,
- int resourceId,
- String name,
- String desc,
- ResourceType type,
- MultipartFile file) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> updateResource(User loginUser,
+ int resourceId,
+ String name,
+ String desc,
+ ResourceType type,
+ MultipartFile file) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
@@ -326,56 +291,25 @@ public class ResourcesService extends BaseService {
return result;
}
- //check resource aleady exists
+ //check resource already exists
String originFullName = resource.getFullName();
String originResourceName = resource.getAlias();
- String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
+ String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/") + 1),name);
if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} already exists, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
- if (file != null) {
-
- // file is empty
- if (file.isEmpty()) {
- logger.error("file is empty: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
- return result;
- }
-
- // file suffix
- String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
- String nameSuffix = FileUtils.suffix(name);
-
- // determine file suffix
- if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
- /**
- * rename file suffix and original suffix must be consistent
- */
- logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
- return result;
- }
-
- //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
- if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
- logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
- putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
- return result;
- }
- if (file.getSize() > Constants.MAX_FILE_SIZE) {
- logger.error("file size is too large: {}", file.getOriginalFilename());
- putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
- return result;
- }
+ result = verifyFile(name, type, file);
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
+ return result;
}
// query tenant by user id
String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
+ if (StringUtils.isEmpty(tenantCode)) {
return result;
}
// verify whether the resource exists in storage
@@ -439,7 +373,7 @@ public class ResourcesService extends BaseService {
List<Integer> childrenResource = listAllChildren(resource,false);
if (CollectionUtils.isNotEmpty(childrenResource)) {
String matcherFullName = Matcher.quoteReplacement(fullName);
- List<Resource> childResourceList = new ArrayList<>();
+ List<Resource> childResourceList;
Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
childResourceList = resourceList.stream().map(t -> {
@@ -476,7 +410,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<>(5);
+ Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
@@ -496,26 +430,24 @@ public class ResourcesService extends BaseService {
if (file != null) {
// fail upload
if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
+ logger.error("upload resource: {} file: {} failed.", name, RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
+ throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
}
if (!fullName.equals(originFullName)) {
try {
HadoopUtils.getInstance().delete(originHdfsFileName,false);
} catch (IOException e) {
logger.error(e.getMessage(),e);
- throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
+ throw new ServiceException(String.format("delete resource: %s failed.", originFullName));
}
}
return result;
}
-
// get the path of dest file in hdfs
String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
-
try {
logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true);
@@ -526,7 +458,44 @@ public class ResourcesService extends BaseService {
}
return result;
+ }
+
+ private Result<Object> verifyFile(String name, ResourceType type, MultipartFile file) {
+ Result<Object> result = new Result<>();
+ putMsg(result, Status.SUCCESS);
+ if (file != null) {
+ // file is empty
+ if (file.isEmpty()) {
+ logger.error("file is empty: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+ putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
+ return result;
+ }
+
+ // file suffix
+ String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
+ String nameSuffix = FileUtils.suffix(name);
+
+ // determine file suffix
+ if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
+ // rename file suffix and original suffix must be consistent
+ logger.error("rename file suffix and original suffix must be consistent: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+ putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
+ return result;
+ }
+ //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
+ if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
+ logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
+ putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
+ return result;
+ }
+ if (file.getSize() > Constants.MAX_FILE_SIZE) {
+ logger.error("file size is too large: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+ putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
+ return result;
+ }
+ }
+ return result;
}
/**
@@ -539,16 +508,16 @@ public class ResourcesService extends BaseService {
* @param pageSize page size
* @return resource list page
*/
- public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
+ public Map<String, Object> queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
- HashMap<String, Object> result = new HashMap<>(5);
- Page<Resource> page = new Page(pageNo, pageSize);
+ HashMap<String, Object> result = new HashMap<>();
+ Page<Resource> page = new Page<>(pageNo, pageSize);
int userId = loginUser.getId();
if (isAdmin(loginUser)) {
- userId= 0;
+ userId = 0;
}
- if (direcotryId != -1) {
- Resource directory = resourcesMapper.selectById(direcotryId);
+ if (directoryId != -1) {
+ Resource directory = resourcesMapper.selectById(directoryId);
if (directory == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
@@ -556,8 +525,8 @@ public class ResourcesService extends BaseService {
}
IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
- userId,direcotryId, type.ordinal(), searchVal);
- PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
+ userId,directoryId, type.ordinal(), searchVal);
+ PageInfo<Resource> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int)resourceIPage.getTotal());
pageInfo.setLists(resourceIPage.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
@@ -566,14 +535,13 @@ public class ResourcesService extends BaseService {
}
/**
- * create direcoty
+ * create directory
* @param loginUser login user
* @param fullName full name
* @param type resource type
* @param result Result
*/
- private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
- // query tenant
+ private void createDirectory(User loginUser,String fullName,ResourceType type,Result<Object> result) {
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
@@ -585,12 +553,12 @@ public class ResourcesService extends BaseService {
if (!HadoopUtils.getInstance().mkdir(directoryName)) {
logger.error("create resource directory {} of hdfs failed",directoryName);
putMsg(result,Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+ throw new ServiceException(String.format("create resource directory: %s failed.", directoryName));
}
} catch (Exception e) {
logger.error("create resource directory {} of hdfs failed",directoryName);
putMsg(result,Status.HDFS_OPERATION_ERROR);
- throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+ throw new ServiceException(String.format("create resource directory: %s failed.", directoryName));
}
}
@@ -615,7 +583,6 @@ public class ResourcesService extends BaseService {
// random file name
String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
-
// save file to hdfs, and delete original file
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
@@ -641,11 +608,10 @@ public class ResourcesService extends BaseService {
* @return resource list
*/
public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
-
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
int userId = loginUser.getId();
- if(isAdmin(loginUser)){
+ if (isAdmin(loginUser)) {
userId = 0;
}
List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
@@ -664,22 +630,21 @@ public class ResourcesService extends BaseService {
* @return resource list
*/
public Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
-
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
String suffix = ".jar";
int userId = loginUser.getId();
- if(isAdmin(loginUser)){
+ if (isAdmin(loginUser)) {
userId = 0;
}
if (programType != null) {
switch (programType) {
case JAVA:
- break;
case SCALA:
break;
case PYTHON:
suffix = ".py";
break;
+ default:
}
}
List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
@@ -697,23 +662,18 @@ public class ResourcesService extends BaseService {
* @param loginUser login user
* @param resourceId resource id
* @return delete result code
- * @throws Exception exception
+ * @throws IOException exception
*/
@Transactional(rollbackFor = Exception.class)
- public Result delete(User loginUser, int resourceId) throws Exception {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> delete(User loginUser, int resourceId) throws IOException {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- //get resource and hdfs path
+ // get resource by id
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
- logger.error("resource file not exist, resource id {}", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -723,7 +683,7 @@ public class ResourcesService extends BaseService {
}
String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
+ if (StringUtils.isEmpty(tenantCode)) {
return result;
}
@@ -735,11 +695,11 @@ public class ResourcesService extends BaseService {
List<Integer> allChildren = listAllChildren(resource,true);
Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);
- //if resource type is UDF,need check whether it is bound by UDF functon
+ //if resource type is UDF,need check whether it is bound by UDF function
if (resource.getType() == (ResourceType.UDF)) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
if (CollectionUtils.isNotEmpty(udfFuncs)) {
- logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
+ logger.error("can't be deleted,because it is bound by UDF functions:{}", udfFuncs);
putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
return result;
}
@@ -781,22 +741,22 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
- public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
- Result result = new Result();
+ public Result<Object> verifyResourceName(String fullName, ResourceType type, User loginUser) {
+ Result<Object> result = new Result<>();
putMsg(result, Status.SUCCESS);
if (checkResourceExists(fullName, 0, type.ordinal())) {
- logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
+ logger.error("resource type:{} name:{} has exist, can't create again.", type, RegexUtils.escapeNRT(fullName));
putMsg(result, Status.RESOURCE_EXIST);
} else {
// query tenant
Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
- if(tenant != null){
+ if (tenant != null) {
String tenantCode = tenant.getTenantCode();
try {
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
- if(HadoopUtils.getInstance().exists(hdfsFilename)){
- logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
+ if (HadoopUtils.getInstance().exists(hdfsFilename)) {
+ logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, RegexUtils.escapeNRT(fullName), hdfsFilename);
putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
}
@@ -804,12 +764,11 @@ public class ResourcesService extends BaseService {
logger.error(e.getMessage(),e);
putMsg(result,Status.HDFS_OPERATION_ERROR);
}
- }else{
+ } else {
putMsg(result,Status.TENANT_NOT_EXIST);
}
}
-
return result;
}
@@ -820,17 +779,15 @@ public class ResourcesService extends BaseService {
* @param type resource type
* @return true if the resource full name or pid not exists, otherwise return false
*/
- public Result queryResource(String fullName,Integer id,ResourceType type) {
- Result result = new Result();
+ public Result<Object> queryResource(String fullName,Integer id,ResourceType type) {
+ Result<Object> result = new Result<>();
if (StringUtils.isBlank(fullName) && id == null) {
- logger.error("You must input one of fullName and pid");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
return result;
}
if (StringUtils.isNotBlank(fullName)) {
List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
if (CollectionUtils.isEmpty(resourceList)) {
- logger.error("resource file not exist, resource full name {} ", fullName);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -839,13 +796,11 @@ public class ResourcesService extends BaseService {
} else {
Resource resource = resourcesMapper.selectById(id);
if (resource == null) {
- logger.error("resource file not exist, resource id {}", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
Resource parentResource = resourcesMapper.selectById(resource.getPid());
if (parentResource == null) {
- logger.error("parent resource file not exist, resource id {}", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -863,20 +818,15 @@ public class ResourcesService extends BaseService {
* @param limit limit
* @return resource content
*/
- public Result readResource(int resourceId, int skipLineNum, int limit) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> readResource(int resourceId, int skipLineNum, int limit) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
// get resource by id
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
- logger.error("resource file not exist, resource id {}", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -893,15 +843,15 @@ public class ResourcesService extends BaseService {
}
String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
+ if (StringUtils.isEmpty(tenantCode)) {
return result;
}
// hdfs path
String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
- logger.info("resource hdfs path is {} ", hdfsFileName);
+ logger.info("resource hdfs path is {}", hdfsFileName);
try {
- if(HadoopUtils.getInstance().exists(hdfsFileName)){
+ if (HadoopUtils.getInstance().exists(hdfsFileName)) {
List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
putMsg(result, Status.SUCCESS);
@@ -909,7 +859,7 @@ public class ResourcesService extends BaseService {
map.put(ALIAS, resource.getAlias());
map.put(CONTENT, String.join("\n", content));
result.setData(map);
- }else{
+ } else {
logger.error("read file {} not exist in hdfs", hdfsFileName);
putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
}
@@ -931,15 +881,14 @@ public class ResourcesService extends BaseService {
* @param fileSuffix file suffix
* @param desc description
* @param content content
+ * @param pid pid
+ * @param currentDir current directory
* @return create result code
*/
@Transactional(rollbackFor = Exception.class)
- public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
- Result result = new Result();
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDir) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
@@ -949,32 +898,18 @@ public class ResourcesService extends BaseService {
if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
if (!strList.contains(nameSuffix)) {
- logger.error("resouce suffix {} not support create", nameSuffix);
+ logger.error("resource suffix {} not support create", nameSuffix);
putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
return result;
}
}
String name = fileName.trim() + "." + nameSuffix;
- String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
-
- result = verifyResourceName(fullName,type,loginUser);
+ String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
+ result = verifyResource(loginUser, type, fullName, pid);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
- if (pid != -1) {
- Resource parentResource = resourcesMapper.selectById(pid);
-
- if (parentResource == null) {
- putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, parentResource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- }
// save data
Date now = new Date();
@@ -996,7 +931,44 @@ public class ResourcesService extends BaseService {
result = uploadContentToHdfs(fullName, tenantCode, content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- throw new RuntimeException(result.getMsg());
+ throw new ServiceException(result.getMsg());
+ }
+ return result;
+ }
+
+ private Result<Object> checkResourceUploadStartupState() {
+ Result<Object> result = new Result<>();
+ putMsg(result, Status.SUCCESS);
+ // if resource upload startup
+ if (!PropertyUtils.getResUploadStartupState()) {
+ logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+ putMsg(result, Status.HDFS_NOT_STARTUP);
+ return result;
+ }
+ return result;
+ }
+
+ private Result<Object> verifyResource(User loginUser, ResourceType type, String fullName, int pid) {
+ Result<Object> result = verifyResourceName(fullName, type, loginUser);
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
+ return result;
+ }
+ return verifyPid(loginUser, pid);
+ }
+
+ private Result<Object> verifyPid(User loginUser, int pid) {
+ Result<Object> result = new Result<>();
+ putMsg(result, Status.SUCCESS);
+ if (pid != -1) {
+ Resource parentResource = resourcesMapper.selectById(pid);
+ if (parentResource == null) {
+ putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
+ return result;
+ }
+ if (!hasPerm(loginUser, parentResource.getUserId())) {
+ putMsg(result, Status.USER_NO_OPERATION_PERM);
+ return result;
+ }
}
return result;
}
@@ -1009,13 +981,9 @@ public class ResourcesService extends BaseService {
* @return update result cod
*/
@Transactional(rollbackFor = Exception.class)
- public Result updateResourceContent(int resourceId, String content) {
- Result result = new Result();
-
- // if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- putMsg(result, Status.HDFS_NOT_STARTUP);
+ public Result<Object> updateResourceContent(int resourceId, String content) {
+ Result<Object> result = checkResourceUploadStartupState();
+ if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
@@ -1038,17 +1006,16 @@ public class ResourcesService extends BaseService {
}
String tenantCode = getTenantCode(resource.getUserId(),result);
- if (StringUtils.isEmpty(tenantCode)){
+ if (StringUtils.isEmpty(tenantCode)) {
return result;
}
resource.setSize(content.getBytes().length);
resource.setUpdateTime(new Date());
resourcesMapper.updateById(resource);
-
result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
- throw new RuntimeException(result.getMsg());
+ throw new ServiceException(result.getMsg());
}
return result;
}
@@ -1059,8 +1026,8 @@ public class ResourcesService extends BaseService {
* @param content content
* @return result
*/
- private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
- Result result = new Result();
+ private Result<Object> uploadContentToHdfs(String resourceName, String tenantCode, String content) {
+ Result<Object> result = new Result<>();
String localFilename = "";
String hdfsFileName = "";
try {
@@ -1068,7 +1035,7 @@ public class ResourcesService extends BaseService {
if (!FileUtils.writeContent2File(content, localFilename)) {
// write file fail
- logger.error("file {} fail, content is {}", localFilename, content);
+ logger.error("file {} fail, content is {}", localFilename, RegexUtils.escapeNRT(content));
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1076,7 +1043,7 @@ public class ResourcesService extends BaseService {
// get resource file hdfs path
hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
- logger.info("resource hdfs path is {} ", hdfsFileName);
+ logger.info("resource hdfs path is {}, resource dir is {}", hdfsFileName, resourcePath);
HadoopUtils hadoopUtils = HadoopUtils.getInstance();
if (!hadoopUtils.exists(resourcePath)) {
@@ -1098,19 +1065,18 @@ public class ResourcesService extends BaseService {
return result;
}
-
/**
* download file
*
* @param resourceId resource id
* @return resource content
- * @throws Exception exception
+ * @throws IOException exception
*/
- public org.springframework.core.io.Resource downloadResource(int resourceId) throws Exception {
+ public org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException {
// if resource upload startup
- if (!PropertyUtils.getResUploadStartupState()){
+ if (!PropertyUtils.getResUploadStartupState()) {
logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
- throw new RuntimeException("hdfs not startup");
+ throw new ServiceException("hdfs not startup");
}
Resource resource = resourcesMapper.selectById(resourceId);
@@ -1120,20 +1086,20 @@ public class ResourcesService extends BaseService {
}
if (resource.isDirectory()) {
logger.error("resource id {} is directory,can't download it", resourceId);
- throw new RuntimeException("cant't download directory");
+ throw new ServiceException("can't download directory");
}
int userId = resource.getUserId();
User user = userMapper.selectById(userId);
- if(user == null){
+ if (user == null) {
logger.error("user id {} not exists", userId);
- throw new RuntimeException(String.format("resource owner id %d not exist",userId));
+ throw new ServiceException(String.format("resource owner id %d not exist",userId));
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
- if(tenant == null){
+ if (tenant == null) {
logger.error("tenant id {} not exists", user.getTenantId());
- throw new RuntimeException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
+ throw new ServiceException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
}
String tenantCode = tenant.getTenantCode();
@@ -1141,13 +1107,12 @@ public class ResourcesService extends BaseService {
String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
- logger.info("resource hdfs path is {} ", hdfsFileName);
+ logger.info("resource hdfs path is {}, download local filename is {}", hdfsFileName, localFileName);
HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true);
return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
}
-
/**
* list all file
*
@@ -1190,7 +1155,7 @@ public class ResourcesService extends BaseService {
}
List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
List<Resource> list;
- if (resourceList != null && resourceList.size() > 0) {
+ if (resourceList != null && !resourceList.isEmpty()) {
Set<Resource> resourceSet = new HashSet<>(resourceList);
List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
@@ -1213,7 +1178,7 @@ public class ResourcesService extends BaseService {
* @return unauthorized result code
*/
public Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (isNotAdmin(loginUser, result)) {
return result;
@@ -1221,7 +1186,7 @@ public class ResourcesService extends BaseService {
List<UdfFunc> udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId);
List<UdfFunc> resultList = new ArrayList<>();
- Set<UdfFunc> udfFuncSet = null;
+ Set<UdfFunc> udfFuncSet;
if (CollectionUtils.isNotEmpty(udfFuncList)) {
udfFuncSet = new HashSet<>(udfFuncList);
@@ -1235,7 +1200,6 @@ public class ResourcesService extends BaseService {
return result;
}
-
/**
* authorized udf function
*
@@ -1254,7 +1218,6 @@ public class ResourcesService extends BaseService {
return result;
}
-
/**
* authorized file
*
@@ -1263,7 +1226,7 @@ public class ResourcesService extends BaseService {
* @return authorized result
*/
public Map<String, Object> authorizedFile(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
@@ -1285,7 +1248,7 @@ public class ResourcesService extends BaseService {
* @param authedResourceList authorized resource list
*/
private void getAuthorizedResourceList(Set<?> resourceSet, List<?> authedResourceList) {
- Set<?> authedResourceSet = null;
+ Set<?> authedResourceSet;
if (CollectionUtils.isNotEmpty(authedResourceList)) {
authedResourceSet = new HashSet<>(authedResourceList);
resourceSet.removeAll(authedResourceSet);
@@ -1297,10 +1260,9 @@ public class ResourcesService extends BaseService {
*
* @param userId user id
* @param result return result
- * @return
+ * @return tenant code
*/
- private String getTenantCode(int userId,Result result){
-
+ private String getTenantCode(int userId,Result<Object> result) {
User user = userMapper.selectById(userId);
if (user == null) {
logger.error("user {} not exists", userId);
@@ -1309,7 +1271,7 @@ public class ResourcesService extends BaseService {
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
- if (tenant == null){
+ if (tenant == null) {
logger.error("tenant not exists");
putMsg(result, Status.TENANT_NOT_EXIST);
return null;
@@ -1323,13 +1285,13 @@ public class ResourcesService extends BaseService {
* @param containSelf whether add self to children list
* @return all children id
*/
- List<Integer> listAllChildren(Resource resource,boolean containSelf){
+ List<Integer> listAllChildren(Resource resource,boolean containSelf) {
List<Integer> childList = new ArrayList<>();
if (resource.getId() != -1 && containSelf) {
childList.add(resource.getId());
}
- if(resource.isDirectory()){
+ if (resource.isDirectory()) {
listAllChildren(resource.getId(),childList);
}
return childList;
@@ -1340,12 +1302,11 @@ public class ResourcesService extends BaseService {
* @param resourceId resource id
* @param childList child list
*/
- void listAllChildren(int resourceId,List<Integer> childList){
-
+ void listAllChildren(int resourceId,List<Integer> childList) {
List<Integer> children = resourcesMapper.listChildren(resourceId);
- for(int chlidId:children){
- childList.add(chlidId);
- listAllChildren(chlidId,childList);
+ for (int childId : children) {
+ childList.add(childId);
+ listAllChildren(childId, childList);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
similarity index 94%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 55880ad..f6fd467 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -15,11 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -61,12 +66,12 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
- * scheduler service
+ * scheduler service impl
*/
@Service
-public class SchedulerService extends BaseService {
+public class SchedulerServiceImpl extends BaseService implements SchedulerService {
- private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class);
+ private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
@Autowired
private ProjectService projectService;
@@ -113,7 +118,7 @@ public class SchedulerService extends BaseService {
Priority processInstancePriority,
String workerGroup) {
- Map<String, Object> result = new HashMap();
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
@@ -146,7 +151,7 @@ public class SchedulerService extends BaseService {
scheduleObj.setStartTime(scheduleParam.getStartTime());
scheduleObj.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
- logger.error(scheduleParam.getCrontab() + " verify failure");
+ logger.error("{} verify failure", scheduleParam.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
return result;
@@ -204,7 +209,7 @@ public class SchedulerService extends BaseService {
ReleaseState scheduleStatus,
Priority processInstancePriority,
String workerGroup) {
- Map<String, Object> result = new HashMap<String, Object>(5);
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
@@ -298,8 +303,7 @@ public class SchedulerService extends BaseService {
String projectName,
Integer id,
ReleaseState scheduleStatus) {
-
- Map<String, Object> result = new HashMap<String, Object>(5);
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
@@ -340,10 +344,10 @@ public class SchedulerService extends BaseService {
List<Integer> subProcessDefineIds = new ArrayList<>();
processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
- if (subProcessDefineIds.size() > 0) {
+ if (!subProcessDefineIds.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryDefinitionListByIdList(idArray);
- if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) {
+ if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
@@ -362,7 +366,7 @@ public class SchedulerService extends BaseService {
// check master server exists
List<Server> masterServers = monitorService.getServerListFromZK(true);
- if (masterServers.size() == 0) {
+ if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
@@ -374,20 +378,17 @@ public class SchedulerService extends BaseService {
try {
switch (scheduleStatus) {
- case ONLINE: {
+ case ONLINE:
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
- }
- case OFFLINE: {
+ case OFFLINE:
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
- }
- default: {
+ default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
- }
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
@@ -426,12 +427,12 @@ public class SchedulerService extends BaseService {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
return result;
}
- Page<Schedule> page = new Page(pageNo, pageSize);
+ Page<Schedule> page = new Page<>(pageNo, pageSize);
IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
page, processDefineId, searchVal
);
- PageInfo pageInfo = new PageInfo<Schedule>(pageNo, pageSize);
+ PageInfo<Schedule> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int) scheduleIPage.getTotal());
pageInfo.setLists(scheduleIPage.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
@@ -466,7 +467,6 @@ public class SchedulerService extends BaseService {
}
public void setSchedule(int projectId, Schedule schedule) {
-
int scheduleId = schedule.getId();
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
@@ -490,7 +490,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
- public static void deleteSchedule(int projectId, int scheduleId) {
+ public void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
String jobName = QuartzExecutors.buildJobName(scheduleId);
@@ -593,8 +593,8 @@ public class SchedulerService extends BaseService {
return result;
}
List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
- result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t)));
+ result.put(Constants.DATA_LIST, selfFireDateList.stream().map(DateUtils::dateToString));
putMsg(result, Status.SUCCESS);
return result;
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
similarity index 91%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 6c68202..6c91e20 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -15,9 +15,14 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.TaskInstanceService;
+import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -46,10 +51,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
- * task instance service
+ * task instance service impl
*/
@Service
-public class TaskInstanceService extends BaseService {
+public class TaskInstanceServiceImpl extends BaseService implements TaskInstanceService {
@Autowired
ProjectMapper projectMapper;
@@ -118,8 +123,8 @@ public class TaskInstanceService extends BaseService {
}
}
- Page<TaskInstance> page = new Page(pageNo, pageSize);
- PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
+ Page<TaskInstance> page = new Page<>(pageNo, pageSize);
+ PageInfo<Map<String, Object>> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
@@ -154,7 +159,7 @@ public class TaskInstanceService extends BaseService {
* @return the result code and msg
*/
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check user auth
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 8aafe91..9f23428 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -87,7 +87,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
int queueId,
String desc) throws Exception {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
if (isNotAdmin(loginUser, result)) {
return result;
@@ -140,7 +140,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/
public Map<String, Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
@@ -171,7 +171,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
public Map<String, Object> updateTenant(User loginUser, int id, String tenantCode, int queueId,
String desc) throws Exception {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
if (isNotAdmin(loginUser, result)) {
@@ -233,7 +233,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
@@ -291,7 +291,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/
public Map<String, Object> queryTenantList(String tenantCode) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
List<Tenant> resourceList = tenantMapper.queryByTenantCode(tenantCode);
if (CollectionUtils.isNotEmpty(resourceList)) {
@@ -311,7 +311,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/
public Map<String, Object> queryTenantList(User loginUser) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
List<Tenant> resourceList = tenantMapper.selectList(null);
result.put(Constants.DATA_LIST, resourceList);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index d1416df..58e4912 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -132,8 +132,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
String phone,
String queue,
int state) throws IOException {
-
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//check all user params
String msg = this.checkUserParams(userName, userPassword, email, phone);
@@ -295,7 +294,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list page
*/
public Map<String, Object> queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@@ -337,7 +336,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
String phone,
String queue,
int state) throws IOException {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) {
@@ -461,7 +460,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @throws Exception exception when operate hdfs
*/
public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM, id);
@@ -501,7 +500,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
//only admin can operate
@@ -550,7 +549,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@@ -645,7 +644,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
@@ -691,7 +690,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
//only admin can operate
@@ -771,7 +770,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list
*/
public Map<String, Object> queryAllGeneralUsers(User loginUser) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@@ -791,7 +790,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list
*/
public Map<String, Object> queryUserList(User loginUser) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@@ -832,7 +831,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/
public Map<String, Object> unauthorizedUser(User loginUser, Integer alertgroupId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
@@ -867,7 +866,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return authorized result code
*/
public Map<String, Object> authorizedUser(User loginUser, Integer alertgroupId) {
- Map<String, Object> result = new HashMap<>(5);
+ Map<String, Object> result = new HashMap<>();
//only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
index 9ff7fac..482cb55 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
@@ -44,4 +44,13 @@ public class RegexUtils {
Matcher isNum = pattern.matcher(str);
return isNum.matches();
}
+
+ public static String escapeNRT(String str) {
+ // Logging should not be vulnerable to injection attacks: Replace pattern-breaking characters
+ if (str != null && !str.isEmpty()) {
+ return str.replaceAll("[\n|\r|\t]", "_");
+ }
+ return null;
+ }
+
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index d430d3a..7b9c869 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -68,7 +69,7 @@ public class ResourcesServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class);
@InjectMocks
- private ResourcesService resourcesService;
+ private ResourcesServiceImpl resourcesService;
@Mock
private ResourceMapper resourcesMapper;
@Mock
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index deadc21..389afed 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.model.Server;
@@ -53,7 +54,7 @@ public class SchedulerServiceTest {
@InjectMocks
- private SchedulerService schedulerService;
+ private SchedulerServiceImpl schedulerService;
@Mock
private MonitorService monitorService;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index b1989b4..0f0071c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
@@ -62,7 +63,7 @@ public class TaskInstanceServiceTest {
private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class);
@InjectMocks
- private TaskInstanceService taskInstanceService;
+ private TaskInstanceServiceImpl taskInstanceService;
@Mock
ProjectMapper projectMapper;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 6bed928..de2d62d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -50,12 +50,8 @@ public class StringUtils {
return !isBlank(s);
}
- public static String replaceNRTtoUnderline(String src) {
- if (isBlank(src)) {
- return src;
- } else {
- return src.replaceAll("[\n|\r|\t]", "_");
- }
+ public static String replaceNRTtoUnderline(String str) {
+ return isBlank(str) ? str : str.replaceAll("[\n|\r|\t]", "_");
}
public static String trim(String str) {