You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/07/04 05:41:51 UTC

[dolphinscheduler] branch 2.0.6-prepare updated: cherry pick #9107 (#10756)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch 2.0.6-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.6-prepare by this push:
     new 5d34d9e270 cherry pick #9107 (#10756)
5d34d9e270 is described below

commit 5d34d9e270f98d8ea70cc162807f16c4b93cf289
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Jul 4 13:41:44 2022 +0800

    cherry pick #9107 (#10756)
---
 .../api/service/impl/ResourcesServiceImpl.java     | 94 +++++++++++++++++----
 .../dao/upgrade/DolphinSchedulerManager.java       |  8 +-
 .../dolphinscheduler/dao/upgrade/ResourceDao.java  | 96 +++++++++++++++++++++-
 .../dolphinscheduler/dao/upgrade/UpgradeDao.java   | 57 +++++++++----
 4 files changed, 221 insertions(+), 34 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 251b8039ee..48534e63a9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -17,13 +17,10 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.io.Files;
-import org.apache.commons.beanutils.BeanMap;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.ALIAS;
+import static org.apache.dolphinscheduler.common.Constants.CONTENT;
+import static org.apache.dolphinscheduler.common.Constants.JAR;
+
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
 import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
@@ -40,10 +37,38 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import org.apache.commons.beanutils.BeanMap;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -52,13 +77,11 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.multipart.MultipartFile;
 
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.stream.Collectors;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
 
 /**
  * resources service impl
@@ -198,6 +221,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
 
         try {
             resourcesMapper.insert(resource);
+            updateParentResourceSize(resource, resource.getSize());
             putMsg(result, Status.SUCCESS);
             Map<Object, Object> dataMap = new BeanMap(resource);
             Map<String, Object> resultMap = new HashMap<>();
@@ -221,6 +245,33 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
         return result;
     }
 
+    /**
+     * update the folder's size of the resource
+     *
+     * @param resource the current resource
+     * @param size size
+     */
+    private void updateParentResourceSize(Resource resource, long size) {
+        if (resource.getSize() > 0) {
+            String[] splits = resource.getFullName().split("/");
+            for (int i = 1; i < splits.length; i++) {
+                String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits, 0, i));
+                if (StringUtils.isNotBlank(parentFullName)) {
+                    List<Resource> resources = resourcesMapper.queryResource(parentFullName, resource.getType().ordinal());
+                    if (CollectionUtils.isNotEmpty(resources)) {
+                        Resource parentResource = resources.get(0);
+                        if (parentResource.getSize() + size >= 0) {
+                            parentResource.setSize(parentResource.getSize() + size);
+                        } else {
+                            parentResource.setSize(0L);
+                        }
+                        resourcesMapper.updateById(parentResource);
+                    }
+                }
+            }
+        }
+    }
+
     /**
      * check resource is exists
      *
@@ -338,6 +389,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
 
         // updateResource data
         Date now = new Date();
+        long originFileSize = resource.getSize();
 
         resource.setAlias(name);
         resource.setFileName(name);
@@ -423,6 +475,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
                     throw new ServiceException(String.format("delete resource: %s failed.", originFullName));
                 }
             }
+
+            updateParentResourceSize(resource, resource.getSize() - originFileSize);
             return result;
         }
 
@@ -705,11 +759,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
         String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
 
         //delete data in database
+        resourcesMapper.selectBatchIds(Arrays.asList(needDeleteResourceIdArray)).forEach(item -> {
+            updateParentResourceSize(item, item.getSize() * -1);
+        });
         resourcesMapper.deleteIds(needDeleteResourceIdArray);
         resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray);
 
         //delete file on hdfs
         HadoopUtils.getInstance().delete(hdfsFilename, true);
+
         putMsg(result, Status.SUCCESS);
 
         return result;
@@ -901,6 +959,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
         Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
 
         resourcesMapper.insert(resource);
+        updateParentResourceSize(resource, resource.getSize());
 
         putMsg(result, Status.SUCCESS);
         Map<Object, Object> dataMap = new BeanMap(resource);
@@ -995,10 +1054,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
         if (StringUtils.isEmpty(tenantCode)) {
             return  result;
         }
+        long originFileSize = resource.getSize();
         resource.setSize(content.getBytes().length);
         resource.setUpdateTime(new Date());
         resourcesMapper.updateById(resource);
 
+        updateParentResourceSize(resource, resource.getSize() - originFileSize);
+
         result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             throw new ServiceException(result.getMsg());
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
index 9ac55469af..6743acb2e5 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
@@ -75,6 +75,7 @@ public class DolphinSchedulerManager {
         logger.info("Start initializing the DolphinScheduler manager table structure");
         upgradeDao.initSchema();
     }
+
     public void upgradeDolphinScheduler() throws IOException {
         // Gets a list of all upgrades
         List<String> schemaList = SchemaUtils.getAllSchemaList();
@@ -97,12 +98,13 @@ public class DolphinSchedulerManager {
             }
             // The target version of the upgrade
             String schemaVersion = "";
+            String currentVersion = version;
             for (String schemaDir : schemaList) {
                 schemaVersion = schemaDir.split("_")[0];
                 if (SchemaUtils.isAGreatVersion(schemaVersion, version)) {
                     logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion);
                     logger.info("Begin upgrading DolphinScheduler's table structure");
-                    upgradeDao.upgradeDolphinScheduler(schemaDir);
+                     upgradeDao.upgradeDolphinScheduler(schemaDir);
                     if ("1.3.0".equals(schemaVersion)) {
                         upgradeDao.upgradeDolphinSchedulerWorkerGroup();
                     } else if ("1.3.2".equals(schemaVersion)) {
@@ -113,6 +115,10 @@ public class DolphinSchedulerManager {
                     version = schemaVersion;
                 }
             }
+
+            if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) && SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
+                upgradeDao.upgradeDolphinSchedulerResourceFileSize();
+            }
         }
 
         // Assign the value of the version field in the version table to the version of the product
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
index 49c0e80c48..7d587e708f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
@@ -18,14 +18,21 @@
 package org.apache.dolphinscheduler.dao.upgrade;
 
 import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
 
 /**
  * resource dao
@@ -65,4 +72,89 @@ public class ResourceDao {
         return resourceMap;
     }
 
+    /**
+     * list all resources by the type
+     *
+     * @param conn connection
+     * @return map that key is full_name and value is the folder's size
+     */
+    private Map<String, Long> listAllResourcesByFileType(Connection conn, int type) {
+        Map<String, Long> resourceSizeMap = new HashMap<>();
+
+        String sql = String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type);
+        ResultSet rs = null;
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = conn.prepareStatement(sql);
+            rs = pstmt.executeQuery();
+
+            while (rs.next()) {
+                String fullName = rs.getString("full_name");
+                Boolean isDirectory = rs.getBoolean("is_directory");
+                long fileSize = rs.getLong("size");
+
+                if (StringUtils.isNotBlank(fullName) && !isDirectory) {
+                    String[] splits = fullName.split("/");
+                    for (int i = 1; i < splits.length; i++) {
+                        String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits,0, splits.length - i));
+                        if (StringUtils.isNotEmpty(parentFullName)) {
+                            long size = resourceSizeMap.getOrDefault(parentFullName, 0L);
+                            resourceSizeMap.put(parentFullName, size + fileSize);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException("sql: " + sql, e);
+        } finally {
+            if (Objects.nonNull(pstmt)) {
+                try {
+                    if (!pstmt.isClosed()) {
+                        pstmt.close();
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        return resourceSizeMap;
+    }
+
+    /**
+     * update the folder's size
+     *
+     * @param conn connection
+     */
+    public void updateResourceFolderSizeByFileType(Connection conn, int type) {
+        Map<String, Long> resourceSizeMap = listAllResourcesByFileType(conn, type);
+
+        String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true";
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = conn.prepareStatement(sql);
+            for (Map.Entry<String, Long> entry : resourceSizeMap.entrySet()) {
+                pstmt.setLong(1, entry.getValue());
+                pstmt.setInt(2, type);
+                pstmt.setString(3, entry.getKey());
+                pstmt.addBatch();
+            }
+            pstmt.executeBatch();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException("sql: " + sql, e);
+        } finally {
+            if (Objects.nonNull(pstmt)) {
+                try {
+                    if (!pstmt.isClosed()) {
+                        pstmt.close();
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            ConnectionUtils.releaseResource(conn);
+        }
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index da9c2e0638..0cc571bf02 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -17,15 +17,12 @@
 
 package org.apache.dolphinscheduler.dao.upgrade;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
@@ -37,12 +34,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
 
-import javax.sql.DataSource;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -51,9 +46,27 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public abstract class UpgradeDao {
     public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
     private static final String T_VERSION_NAME = "t_escheduler_version";
@@ -150,6 +163,21 @@ public abstract class UpgradeDao {
         upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
     }
 
+    /**
+     * upgrade DolphinScheduler to 2.0.6
+     */
+    public void upgradeDolphinSchedulerResourceFileSize() {
+        ResourceDao resourceDao = new ResourceDao();
+        try {
+            // update the size of the folder that is the type of file.
+            resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
+            // update the size of the folder that is the type of udf.
+            resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
+        } catch (Exception ex) {
+            logger.error("Failed to upgrade because of failing to update the folder's size of resource files.");
+        }
+    }
+
     /**
      * updateProcessDefinitionJsonWorkerGroup
      */
@@ -344,7 +372,6 @@ public abstract class UpgradeDao {
         }
     }
 
-
     /**
      * update version
      *