You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/10/18 01:15:54 UTC
[dolphinscheduler] 01/02: Remove the DataxTaskTest class because there is no junit5 package.
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit b4f59af9d9f1d43d36fca09ed7a43450cde43e5f
Author: zhuangchong <zh...@163.com>
AuthorDate: Tue Oct 18 09:15:06 2022 +0800
Remove the DataxTaskTest class because there is no junit5 package.
---
.../plugin/task/datax/DataxTaskTest.java | 327 ---------------------
1 file changed, 327 deletions(-)
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
deleted file mode 100644
index 636ba839c0..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.datax;
-
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
-import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
-import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
-import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.nio.file.Files;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-@ExtendWith(MockitoExtension.class)
-public class DataxTaskTest {
-
- private DataxTask dataxTask;
-
- private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {};
-
- @BeforeEach
- public void before() throws Exception {
- TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
- ResourceParametersHelper resourceParametersHelper = new ResourceParametersHelper();
- String parameters = JSONUtils.toJsonString(createDataxParameters());
- when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
- taskExecutionContext.setResourceParametersHelper(resourceParametersHelper);
- this.dataxTask = new DataxTask(taskExecutionContext);
- this.dataxTask.init();
- }
-
- @Test
- public void testHandleNullParamsMap() throws Exception {
- String parameters = JSONUtils.toJsonString(createDataxParameters());
- TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
- taskExecutionContext.setPrepareParamsMap(null);
- taskExecutionContext.setTaskParams(parameters);
- DataxTask dataxTask = new DataxTask(taskExecutionContext);
- dataxTask.init();
-
- ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
- Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
- shellCommandExecutorFiled.setAccessible(true);
- shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
-
- TaskResponse taskResponse = new TaskResponse();
- taskResponse.setStatus(TaskRunStatus.SUCCESS);
- taskResponse.setExitStatusCode(0);
- taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
-
- dataxTask.handle(taskCallBack);
- Assertions.assertEquals(0, dataxTask.getExitStatusCode());
-
- File jsonFile = new File("/tmp/execution/app-id_job.json");
- InputStream json = Files.newInputStream(jsonFile.toPath());
- String resultStr = FileUtils.readFile2Str(json);
- Assertions.assertEquals(resultStr, getJsonString());
- boolean delete = jsonFile.delete();
- Assertions.assertTrue(delete);
-
- File shellCommandFile = SystemUtils.IS_OS_WINDOWS ?
- new File("/tmp/execution/app-id_node.bat") :
- new File("/tmp/execution/app-id_node.sh");
- InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
- String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
- Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
- " /tmp/execution/app-id_job.json");
- delete = shellCommandFile.delete();
- Assertions.assertTrue(delete);
- }
-
- @Test
- public void testHandleParamsMap() throws Exception {
- String parameters = JSONUtils.toJsonString(createDataxParameters());
- TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
-
- taskExecutionContext.setPrepareParamsMap(createPrepareParamsMap());
- taskExecutionContext.setTaskParams(parameters);
- DataxTask dataxTask = new DataxTask(taskExecutionContext);
- dataxTask.init();
-
- ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
- Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
- shellCommandExecutorFiled.setAccessible(true);
- shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
-
- TaskResponse taskResponse = new TaskResponse();
- taskResponse.setStatus(TaskRunStatus.SUCCESS);
- taskResponse.setExitStatusCode(0);
- taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
-
- dataxTask.handle(taskCallBack);
- Assertions.assertEquals(0, dataxTask.getExitStatusCode());
-
- File jsonFile = new File("/tmp/execution/app-id_job.json");
- InputStream json = Files.newInputStream(jsonFile.toPath());
- String resultStr = FileUtils.readFile2Str(json);
- Assertions.assertEquals(resultStr, getJsonString());
- boolean delete = jsonFile.delete();
- Assertions.assertTrue(delete);
-
- File shellCommandFile = SystemUtils.IS_OS_WINDOWS ?
- new File("/tmp/execution/app-id_node.bat") :
- new File("/tmp/execution/app-id_node.sh");
- InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
- String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
- Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
- "-p \"-DDT=DT -DDS=DS\" /tmp/execution/app-id_job.json");
- delete = shellCommandFile.delete();
- Assertions.assertTrue(delete);
- }
-
- @Test
- public void testHandleInterruptedException() throws Exception {
- String parameters = JSONUtils.toJsonString(createDataxParameters());
- TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
- taskExecutionContext.setPrepareParamsMap(null);
- taskExecutionContext.setTaskParams(parameters);
- DataxTask dataxTask = new DataxTask(taskExecutionContext);
- dataxTask.init();
-
- ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
- Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
- shellCommandExecutorFiled.setAccessible(true);
- shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
-
- when(shellCommandExecutor.run(anyString())).thenThrow(new InterruptedException("Command execution failed"));
- Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
- }
-
- @Test
- public void testHandleIOException() throws Exception {
- String parameters = JSONUtils.toJsonString(createDataxParameters());
- TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
- taskExecutionContext.setPrepareParamsMap(null);
- taskExecutionContext.setTaskParams(parameters);
- DataxTask dataxTask = new DataxTask(taskExecutionContext);
- dataxTask.init();
-
- ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
- Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
- shellCommandExecutorFiled.setAccessible(true);
- shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
-
- when(shellCommandExecutor.run(anyString())).thenThrow(new IOException("Command execution failed"));
- Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
- }
-
- @Test
- public void testTryExecuteSqlResolveColumnNames() throws Exception {
- BaseConnectionParam baseConnectionParam = mock(BaseConnectionParam.class);
- try (
- MockedStatic<DataSourceClientProvider> mockedStaticDataSourceClientProvider =
- mockStatic(DataSourceClientProvider.class)) {
- DataSourceClientProvider clientProvider = mock(DataSourceClientProvider.class);
- when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
- mockedStaticDataSourceClientProvider.when(DataSourceClientProvider::getInstance).thenReturn(clientProvider);
-
- Connection connection = mock(Connection.class);
- when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
-
- PreparedStatement stmt = mock(PreparedStatement.class);
- when(connection.prepareStatement(anyString())).thenReturn(stmt);
-
- ResultSetMetaData md = mock(ResultSetMetaData.class);
- when(md.getColumnCount()).thenReturn(1);
- when(md.getColumnName(eq(1))).thenReturn("something");
-
- ResultSet resultSet = mock(ResultSet.class);
- when(resultSet.getMetaData()).thenReturn(md);
- when(stmt.executeQuery()).thenReturn(resultSet);
-
- String[] rows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, "");
- Assertions.assertEquals(rows.length, 1);
- Assertions.assertEquals(rows[0], "something");
-
- when(connection.prepareStatement(anyString())).thenThrow(new SQLException("Connection failed"));
- String[] nullRows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, "");
- Assertions.assertNull(nullRows);
- }
- }
-
- @Test
- public void testGetPythonCommand() {
- Assertions.assertEquals(dataxTask.getPythonCommand(""), "python2.7");
- Assertions.assertEquals(dataxTask.getPythonCommand("/bin/python"), "/bin/python2.7");
-
- String pythonCommand = dataxTask.getPythonCommand("/opt/python");
- pythonCommand = pythonCommand.replace(File.separator, "/");
- Assertions.assertEquals(pythonCommand, "/opt/python/bin/python2.7");
- }
-
- @Test
- public void testLoadJvmEnv() {
- DataxParameters dataXParameters = createDataxParameters();
- dataXParameters.setXms(3);
- dataXParameters.setXmx(4);
- Assertions.assertEquals(dataxTask.loadJvmEnv(dataXParameters), " --jvm=\"-Xms3G -Xmx4G\" ");
- }
-
- private DataxParameters createDataxParameters() {
- DataxParameters dataxParameters = new DataxParameters();
- dataxParameters.setCustomConfig(1);
- dataxParameters.setDsType("mysql");
- dataxParameters.setDataSource(1);
- dataxParameters.setJson(getJsonString());
- dataxParameters.setDataTarget(2);
- dataxParameters.setSql("SELECT count(*) FROM table");
- dataxParameters.setTargetTable("user.name");
- return dataxParameters;
- }
-
- private Map<String, Property> createPrepareParamsMap() {
- Map<String, Property> paramsMap = new HashMap<>();
- Property dtProperty = new Property();
- dtProperty.setProp("DT");
- dtProperty.setDirect(Direct.IN);
- dtProperty.setType(DataType.VARCHAR);
- dtProperty.setValue("DT");
- Property dsProperty = new Property();
- dsProperty.setProp("DS");
- dsProperty.setDirect(Direct.IN);
- dsProperty.setType(DataType.VARCHAR);
- dsProperty.setValue("DS");
- paramsMap.put("DT", dtProperty);
- paramsMap.put("DS", dsProperty);
- return paramsMap;
- }
-
- private TaskExecutionContext buildTestTaskExecutionContext() {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskAppId("app-id");
- taskExecutionContext.setExecutePath("/tmp/execution");
- return taskExecutionContext;
- }
-
- private String getJsonString() {
- return "{\n" +
- " \"job\": {\n" +
- " \"content\": [\n" +
- " {\n" +
- " \"reader\": {\n" +
- " \"name\": \"stream reader\",\n" +
- " \"parameter\": {\n" +
- " \"sliceRecordCount\": 10,\n" +
- " \"column\": [\n" +
- " {\n" +
- " \"type\": \"long\",\n" +
- " \"value\": \"10\"\n" +
- " },\n" +
- " {\n" +
- " \"type\": \"string\",\n" +
- " \"value\": \"Hello DataX\"\n" +
- " }\n" +
- " ]\n" +
- " }\n" +
- " },\n" +
- " \"writer\": {\n" +
- " \"name\": \"stream writer\",\n" +
- " \"parameter\": {\n" +
- " \"encoding\": \"UTF-8\",\n" +
- " \"print\": true\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " ],\n" +
- " \"setting\": {\n" +
- " \"speed\": {\n" +
- " \"channel\": 5\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}";
- }
-}
\ No newline at end of file