You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/22 03:45:55 UTC
[dolphinscheduler] branch dev updated: [Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 996790ce9e [Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)
996790ce9e is described below
commit 996790ce9ea48bd96aed6687d31afe931fc70893
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Fri Apr 22 11:45:49 2022 +0800
[Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)
* The resource download method is selected according to the configuration and the service startup verification is added.
* common check CI fix
* Startup check changed to running check
* code smell
* Coordinate resources to increase test coverage.
* Split resource download method.
* Unit Test Coverage
Co-authored-by: WangJPLeo <wa...@whaleops.com>
---
.../api/service/impl/ResourcesServiceImpl.java | 16 ++---
.../common/config/StoreConfiguration.java | 4 +-
.../StorageOperateNoConfiguredException.java} | 30 +++++---
.../common/storage/StorageOperateManager.java | 48 -------------
.../dolphinscheduler/common/utils/HadoopUtils.java | 12 ++--
.../common/utils/PropertyUtils.java | 2 +-
.../dolphinscheduler/common/utils/S3Utils.java | 27 ++++++--
.../common/exception/ExceptionTest.java | 43 ++++++++++++
.../common/storage/StorageOperateManagerTest.java | 50 --------------
.../common/utils/PropertyUtilsTest.java | 7 ++
.../worker/processor/TaskExecuteProcessor.java | 4 +-
.../server/worker/runner/TaskExecuteThread.java | 79 ++++++++++++----------
.../worker/runner/TaskExecuteThreadTest.java | 76 +++++++++++++++++++++
13 files changed, 233 insertions(+), 165 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 98d7bfae36..d7439053ce 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -66,16 +66,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.rmi.ServerException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
@@ -327,6 +318,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
+ if (!PropertyUtils.getResUploadStartupState()){
+ putMsg(result, Status.STORAGE_NOT_STARTUP);
+ return result;
+ }
+
if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) {
putMsg(result, Status.S3_CANNOT_RENAME);
return result;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
index 9a7ea53fe0..14a74d69f0 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
@@ -25,7 +25,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_HDFS;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
/**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
similarity index 52%
copy from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
index 14279b33d5..3aaf7980d0 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
@@ -15,17 +15,29 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.utils;
+package org.apache.dolphinscheduler.common.exception;
-import org.apache.dolphinscheduler.common.Constants;
-import org.junit.Test;
+/**
+ * exception for store
+ */
+public class StorageOperateNoConfiguredException extends RuntimeException {
+
+ public StorageOperateNoConfiguredException() {
+ }
-import static org.junit.Assert.assertNotNull;
+ public StorageOperateNoConfiguredException(String message) {
+ super(message);
+ }
-public class PropertyUtilsTest {
+ public StorageOperateNoConfiguredException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StorageOperateNoConfiguredException(Throwable cause) {
+ super(cause);
+ }
- @Test
- public void getString() {
- assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
+ public StorageOperateNoConfiguredException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
deleted file mode 100644
index 184a264899..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.storage;
-
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-
-import java.util.EnumMap;
-import java.util.Objects;
-import java.util.ServiceLoader;
-/**
- * @author Storage Operate Manager
- */
-public class StorageOperateManager {
-
- public static final EnumMap<ResUploadType, StorageOperate> OPERATE_MAP = new EnumMap<>(ResUploadType.class);
-
- static {
- ServiceLoader<StorageOperate> load = ServiceLoader.load(StorageOperate.class);
- for (StorageOperate storageOperate : load) {
- OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate);
- }
- }
-
- public static StorageOperate getStorageOperate(ResUploadType resUploadType) {
- if (Objects.isNull(resUploadType)){
- resUploadType = ResUploadType.HDFS;
- }
- StorageOperate storageOperate = OPERATE_MAP.get(resUploadType);
- if (Objects.isNull(storageOperate)){
- storageOperate = OPERATE_MAP.get(ResUploadType.HDFS);
- }
- return storageOperate;
- }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 8ac51c67ba..ec4286e8a1 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -30,14 +30,16 @@ import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
import java.io.*;
import java.nio.charset.StandardCharsets;
@@ -50,13 +52,15 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
/**
* hadoop utils
* single instance
*/
-@Component
public class HadoopUtils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
index 8c316c158e..d253392cdf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
@@ -69,7 +69,7 @@ public class PropertyUtils {
*/
public static boolean getResUploadStartupState() {
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
- ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
+ ResUploadType resUploadType = ResUploadType.valueOf(StringUtils.isEmpty(resUploadStartupType) ? ResUploadType.NONE.name() : resUploadStartupType);
return resUploadType != ResUploadType.NONE;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
index 80fb464661..13e17c3bba 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
@@ -24,7 +24,11 @@ import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.*;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
@@ -36,17 +40,30 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.jets3t.service.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
+import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
+import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
-@Component
public class S3Utils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(S3Utils.class);
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java
new file mode 100644
index 0000000000..6a670c9b61
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.exception;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExceptionTest {
+
+ @Test
+ public void testException(){
+ final String message = "Test";
+ RuntimeException time = new RuntimeException(message);
+
+ Assert.assertNull(new BaseException().getMessage());
+ Assert.assertNotNull(new BaseException(message).getMessage());
+ Assert.assertNotNull(new BaseException(message, time).getMessage());
+ Assert.assertNotNull(new BaseException(time).getCause());
+ Assert.assertNotNull(new BaseException(message, time, false, false).getMessage());
+
+ Assert.assertNull(new StorageOperateNoConfiguredException().getMessage());
+ Assert.assertNotNull(new StorageOperateNoConfiguredException(message).getMessage());
+ Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time).getMessage());
+ Assert.assertNotNull(new StorageOperateNoConfiguredException(time).getCause());
+ Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time, false, false).getMessage());
+ }
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
deleted file mode 100644
index 05f2e45a73..0000000000
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.storage;
-
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.EnumMap;
-
-/**
- * @author StorageOperateManagerTest
- */
-@RunWith(MockitoJUnitRunner.class)
-public class StorageOperateManagerTest {
-
- @Mock
- private HadoopUtils hadoopUtils;
-
- @Test
- public void testManager() {
- StorageOperateManager mock = Mockito.mock(StorageOperateManager.class);
- Assert.assertNotNull(mock);
-
- EnumMap<ResUploadType, StorageOperate> storageOperateMap = StorageOperateManager.OPERATE_MAP;
- storageOperateMap.put(ResUploadType.HDFS, hadoopUtils);
-
- StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS);
- Assert.assertNotNull(storageOperate);
- }
-}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
index 14279b33d5..df8050af41 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.spi.enums.ResUploadType;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
@@ -28,4 +30,9 @@ public class PropertyUtilsTest {
public void getString() {
assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
}
+
+ @Test
+ public void getResUploadStartupState(){
+ Assert.assertFalse(PropertyUtils.getResUploadStartupState());
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 0d2e4880aa..a376587aad 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,8 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.storage.StorageOperateManager;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -163,7 +161,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
// submit task to manager
- boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS)));
+ boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index b58675b899..cc15eb62f4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -44,11 +43,7 @@ import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -115,13 +110,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
- TaskPluginManager taskPluginManager,
- StorageOperate storageOperate) {
+ TaskPluginManager taskPluginManager) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
- this.storageOperate = storageOperate;
}
@Override
@@ -147,7 +140,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
// copy hdfs/minio file to local
- downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
+ List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
+ if (!fileDownloads.isEmpty()){
+ downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
+ }
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -277,34 +273,49 @@ public class TaskExecuteThread implements Runnable, Delayed {
* download resource file
*
* @param execLocalPath execLocalPath
- * @param projectRes projectRes
+ * @param fileDownloads projectRes
* @param logger logger
*/
- private void downloadResource(String execLocalPath, Map<String, String> projectRes, Logger logger) {
- if (MapUtils.isEmpty(projectRes)) {
- return;
+ public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
+ for (Pair<String, String> fileDownload : fileDownloads) {
+ try {
+ // query the tenant code of the resource according to the name of the resource
+ String fullName = fileDownload.getLeft();
+ String tenantCode = fileDownload.getRight();
+ String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
+ logger.info("get resource file from hdfs :{}", resHdfsPath);
+ storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new ServiceException(e.getMessage());
+ }
}
+ }
- Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
-
- for (Map.Entry<String, String> resource : resEntries) {
- String fullName = resource.getKey();
- String tenantCode = resource.getValue();
- File resFile = new File(execLocalPath, fullName);
- if (!resFile.exists()) {
- try {
- // query the tenant code of the resource according to the name of the resource
- String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
- logger.info("get resource file from hdfs :{}", resHdfsPath);
- storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new ServiceException(e.getMessage());
- }
- } else {
+ /**
+ * download resource check
+ * @param execLocalPath
+ * @param projectRes
+ * @return
+ */
+ public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
+ if (MapUtils.isEmpty(projectRes)) {
+ return Collections.emptyList();
+ }
+ List<Pair<String, String>> downloadFile = new ArrayList<>();
+ projectRes.forEach((key, value) -> {
+ File resFile = new File(execLocalPath, key);
+ boolean notExist = !resFile.exists();
+ if (notExist){
+ downloadFile.add(Pair.of(key, value));
+ } else{
logger.info("file : {} exists ", resFile.getName());
}
+ });
+ if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
+ throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
+ return downloadFile;
}
/**
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
new file mode 100644
index 0000000000..f847690a6c
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(PowerMockRunner.class)
+public class TaskExecuteThreadTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
+
+ @Mock
+ private TaskExecutionContext taskExecutionContext;
+
+ @Mock
+ private TaskCallbackService taskCallbackService;
+
+ @Mock
+ private AlertClientService alertClientService;
+
+ @Mock
+ private TaskPluginManager taskPluginManager;
+
+ @Test
+ public void checkTest(){
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
+
+ String path = "/";
+ Map<String, String> projectRes = new HashMap<>();
+ projectRes.put("shell", "shell.sh");
+ List<Pair<String, String>> downloads = new ArrayList<>();
+ try{
+ downloads = taskExecuteThread.downloadCheck(path, projectRes);
+ }catch (Exception e){
+ Assert.assertNotNull(e);
+ }
+ downloads.add(Pair.of("shell", "shell.sh"));
+ try{
+ taskExecuteThread.downloadResource(path, LOGGER, downloads);
+ }catch (Exception e){
+
+ }
+ }
+}