You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/22 03:45:55 UTC

[dolphinscheduler] branch dev updated: [Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 996790ce9e [Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)
996790ce9e is described below

commit 996790ce9ea48bd96aed6687d31afe931fc70893
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Fri Apr 22 11:45:49 2022 +0800

    [Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)
    
    * The resource download method is selected according to the configuration and the service startup verification is added.
    
    * common check CI fix
    
    * Startup check changed to running check
    
    * code smell
    
    * Coordinate resources to increase test coverage.
    
    * Split resource download method.
    
    * Unit Test Coverage
    
    Co-authored-by: WangJPLeo <wa...@whaleops.com>
---
 .../api/service/impl/ResourcesServiceImpl.java     | 16 ++---
 .../common/config/StoreConfiguration.java          |  4 +-
 .../StorageOperateNoConfiguredException.java}      | 30 +++++---
 .../common/storage/StorageOperateManager.java      | 48 -------------
 .../dolphinscheduler/common/utils/HadoopUtils.java | 12 ++--
 .../common/utils/PropertyUtils.java                |  2 +-
 .../dolphinscheduler/common/utils/S3Utils.java     | 27 ++++++--
 .../common/exception/ExceptionTest.java            | 43 ++++++++++++
 .../common/storage/StorageOperateManagerTest.java  | 50 --------------
 .../common/utils/PropertyUtilsTest.java            |  7 ++
 .../worker/processor/TaskExecuteProcessor.java     |  4 +-
 .../server/worker/runner/TaskExecuteThread.java    | 79 ++++++++++++----------
 .../worker/runner/TaskExecuteThreadTest.java       | 76 +++++++++++++++++++++
 13 files changed, 233 insertions(+), 165 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 98d7bfae36..d7439053ce 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -66,16 +66,7 @@ import org.springframework.web.multipart.MultipartFile;
 import java.io.IOException;
 import java.rmi.ServerException;
 import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
@@ -327,6 +318,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
             return result;
         }
 
+        if (!PropertyUtils.getResUploadStartupState()){
+            putMsg(result, Status.STORAGE_NOT_STARTUP);
+            return result;
+        }
+
         if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) {
             putMsg(result, Status.S3_CANNOT_RENAME);
             return result;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
index 9a7ea53fe0..14a74d69f0 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
@@ -25,7 +25,9 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.stereotype.Component;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_HDFS;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
 
 
 /**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
similarity index 52%
copy from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
index 14279b33d5..3aaf7980d0 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
@@ -15,17 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.utils;
+package org.apache.dolphinscheduler.common.exception;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.junit.Test;
+/**
+ * exception for store
+ */
+public class StorageOperateNoConfiguredException extends RuntimeException {
+
+    public StorageOperateNoConfiguredException() {
+    }
 
-import static org.junit.Assert.assertNotNull;
+    public StorageOperateNoConfiguredException(String message) {
+        super(message);
+    }
 
-public class PropertyUtilsTest {
+    public StorageOperateNoConfiguredException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StorageOperateNoConfiguredException(Throwable cause) {
+        super(cause);
+    }
 
-    @Test
-    public void getString() {
-        assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
+    public StorageOperateNoConfiguredException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
deleted file mode 100644
index 184a264899..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.storage;
-
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-
-import java.util.EnumMap;
-import java.util.Objects;
-import java.util.ServiceLoader;
-/**
- * @author Storage Operate Manager
- */
-public class StorageOperateManager {
-
-    public static final EnumMap<ResUploadType, StorageOperate> OPERATE_MAP = new EnumMap<>(ResUploadType.class);
-
-    static {
-        ServiceLoader<StorageOperate> load = ServiceLoader.load(StorageOperate.class);
-        for (StorageOperate storageOperate : load) {
-            OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate);
-        }
-    }
-
-    public static StorageOperate getStorageOperate(ResUploadType resUploadType) {
-        if (Objects.isNull(resUploadType)){
-            resUploadType = ResUploadType.HDFS;
-        }
-        StorageOperate storageOperate = OPERATE_MAP.get(resUploadType);
-        if (Objects.isNull(storageOperate)){
-            storageOperate = OPERATE_MAP.get(ResUploadType.HDFS);
-        }
-        return storageOperate;
-    }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 8ac51c67ba..ec4286e8a1 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -30,14 +30,16 @@ import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
 import java.io.*;
 import java.nio.charset.StandardCharsets;
@@ -50,13 +52,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
 
 /**
  * hadoop utils
  * single instance
  */
-@Component
 public class HadoopUtils implements Closeable, StorageOperate {
 
     private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
index 8c316c158e..d253392cdf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
@@ -69,7 +69,7 @@ public class PropertyUtils {
      */
     public static boolean getResUploadStartupState() {
         String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
-        ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
+        ResUploadType resUploadType = ResUploadType.valueOf(StringUtils.isEmpty(resUploadStartupType) ? ResUploadType.NONE.name() : resUploadStartupType);
         return resUploadType != ResUploadType.NONE;
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
index 80fb464661..13e17c3bba 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
@@ -24,7 +24,11 @@ import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.*;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.amazonaws.services.s3.transfer.MultipleFileDownload;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
@@ -36,17 +40,30 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.jets3t.service.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
+import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
 
-@Component
 public class S3Utils implements Closeable, StorageOperate {
 
     private static final Logger logger = LoggerFactory.getLogger(S3Utils.class);
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java
new file mode 100644
index 0000000000..6a670c9b61
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.exception;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExceptionTest {
+
+    @Test
+    public void testException(){
+        final String message = "Test";
+        RuntimeException time = new RuntimeException(message);
+
+        Assert.assertNull(new BaseException().getMessage());
+        Assert.assertNotNull(new BaseException(message).getMessage());
+        Assert.assertNotNull(new BaseException(message, time).getMessage());
+        Assert.assertNotNull(new BaseException(time).getCause());
+        Assert.assertNotNull(new BaseException(message, time, false, false).getMessage());
+
+        Assert.assertNull(new StorageOperateNoConfiguredException().getMessage());
+        Assert.assertNotNull(new StorageOperateNoConfiguredException(message).getMessage());
+        Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time).getMessage());
+        Assert.assertNotNull(new StorageOperateNoConfiguredException(time).getCause());
+        Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time, false, false).getMessage());
+    }
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
deleted file mode 100644
index 05f2e45a73..0000000000
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.storage;
-
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.EnumMap;
-
-/**
- * @author StorageOperateManagerTest
- */
-@RunWith(MockitoJUnitRunner.class)
-public class StorageOperateManagerTest {
-
-    @Mock
-    private HadoopUtils hadoopUtils;
-
-    @Test
-    public void testManager() {
-        StorageOperateManager mock = Mockito.mock(StorageOperateManager.class);
-        Assert.assertNotNull(mock);
-
-        EnumMap<ResUploadType, StorageOperate> storageOperateMap = StorageOperateManager.OPERATE_MAP;
-        storageOperateMap.put(ResUploadType.HDFS, hadoopUtils);
-
-        StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS);
-        Assert.assertNotNull(storageOperate);
-    }
-}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
index 14279b33d5..df8050af41 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
@@ -18,6 +18,8 @@
 package org.apache.dolphinscheduler.common.utils;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.spi.enums.ResUploadType;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
@@ -28,4 +30,9 @@ public class PropertyUtilsTest {
     public void getString() {
         assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
     }
+
+    @Test
+    public void getResUploadStartupState(){
+        Assert.assertFalse(PropertyUtils.getResUploadStartupState());
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 0d2e4880aa..a376587aad 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,8 +18,6 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.storage.StorageOperateManager;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -163,7 +161,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         }
 
         // submit task to manager
-        boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS)));
+        boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
         if (!offer) {
             logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
                     workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index b58675b899..cc15eb62f4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -44,11 +43,7 @@ import org.apache.commons.lang.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -115,13 +110,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
                              TaskCallbackService taskCallbackService,
                              AlertClientService alertClientService,
-                             TaskPluginManager taskPluginManager,
-                             StorageOperate storageOperate) {
+                             TaskPluginManager taskPluginManager) {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
         this.taskPluginManager = taskPluginManager;
-        this.storageOperate = storageOperate;
     }
 
     @Override
@@ -147,7 +140,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
             taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
 
             // copy hdfs/minio file to local
-            downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
+            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
+            if (!fileDownloads.isEmpty()){
+                downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
+            }
 
             taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
             taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -277,34 +273,49 @@ public class TaskExecuteThread implements Runnable, Delayed {
      * download resource file
      *
      * @param execLocalPath execLocalPath
-     * @param projectRes projectRes
+     * @param fileDownloads projectRes
      * @param logger logger
      */
-    private void downloadResource(String execLocalPath, Map<String, String> projectRes, Logger logger) {
-        if (MapUtils.isEmpty(projectRes)) {
-            return;
+    public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
+        for (Pair<String, String> fileDownload : fileDownloads) {
+            try {
+                // query the tenant code of the resource according to the name of the resource
+                String fullName = fileDownload.getLeft();
+                String tenantCode = fileDownload.getRight();
+                String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
+                logger.info("get resource file from hdfs :{}", resHdfsPath);
+                storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                throw new ServiceException(e.getMessage());
+            }
         }
+    }
 
-        Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
-
-        for (Map.Entry<String, String> resource : resEntries) {
-            String fullName = resource.getKey();
-            String tenantCode = resource.getValue();
-            File resFile = new File(execLocalPath, fullName);
-            if (!resFile.exists()) {
-                try {
-                    // query the tenant code of the resource according to the name of the resource
-                    String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
-                    logger.info("get resource file from hdfs :{}", resHdfsPath);
-                    storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                    throw new ServiceException(e.getMessage());
-                }
-            } else {
+    /**
+     * download resource check
+     * @param execLocalPath
+     * @param projectRes
+     * @return
+     */
+    public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
+        if (MapUtils.isEmpty(projectRes)) {
+            return Collections.emptyList();
+        }
+        List<Pair<String, String>> downloadFile = new ArrayList<>();
+        projectRes.forEach((key, value) -> {
+            File resFile = new File(execLocalPath, key);
+            boolean notExist = !resFile.exists();
+            if (notExist){
+                downloadFile.add(Pair.of(key, value));
+            } else{
                 logger.info("file : {} exists ", resFile.getName());
             }
+        });
+        if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
+            throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
         }
+        return downloadFile;
     }
 
     /**
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
new file mode 100644
index 0000000000..f847690a6c
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(PowerMockRunner.class)
+public class TaskExecuteThreadTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
+
+    @Mock
+    private TaskExecutionContext taskExecutionContext;
+
+    @Mock
+    private TaskCallbackService taskCallbackService;
+
+    @Mock
+    private AlertClientService alertClientService;
+
+    @Mock
+    private TaskPluginManager taskPluginManager;
+
+    @Test
+    public void checkTest(){
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
+
+        String path = "/";
+        Map<String, String> projectRes = new HashMap<>();
+        projectRes.put("shell", "shell.sh");
+        List<Pair<String, String>> downloads = new ArrayList<>();
+        try{
+            downloads = taskExecuteThread.downloadCheck(path, projectRes);
+        }catch (Exception e){
+            Assert.assertNotNull(e);
+        }
+        downloads.add(Pair.of("shell", "shell.sh"));
+        try{
+            taskExecuteThread.downloadResource(path, LOGGER, downloads);
+        }catch (Exception e){
+
+        }
+    }
+}