You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/24 02:45:15 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: add
RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 1e5a114 add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)
1e5a114 is described below
commit 1e5a1140ab39c8edbaef2eaf79357d332141c6c0
Author: Tboy <gu...@immomo.com>
AuthorDate: Tue Mar 24 10:45:04 2020 +0800
add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to SpringConnectionFactory
* refactor TaskCallbackService
* add ZookeeperNodeManagerTest
* add NettyExecutorManagerTest
* refactor TaskKillProcessor
* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest
---
.../worker/processor/TaskCallbackService.java | 5 +-
.../master/dispatch/host/RandomSelectorTest.java | 52 +++++++++
.../dispatch/host/RoundRobinSelectorTest.java | 56 +++++++++
.../worker/processor/TaskCallbackServiceTest.java | 130 +++++++++++++++++++++
.../processor/TaskCallbackServiceTestConfig.java | 130 +++++++++++++++++++++
5 files changed, 372 insertions(+), 1 deletion(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index a508177..f966591 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -79,8 +79,11 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
- public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+ private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
+ if(nettyRemoteChannel == null){
+ throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
+ }
if(nettyRemoteChannel.isActive()){
return nettyRemoteChannel;
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
new file mode 100644
index 0000000..1d7e03e
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * random selector
+ */
+public class RandomSelectorTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSelectWithIllegalArgumentException(){
+ RandomSelector selector = new RandomSelector();
+ selector.select(Collections.EMPTY_LIST);
+ }
+
+ @Test
+ public void testSelect1(){
+ RandomSelector<String> selector = new RandomSelector();
+ String result = selector.select(Arrays.asList("1"));
+ Assert.assertTrue(StringUtils.isNotEmpty(result));
+ Assert.assertTrue(result.equalsIgnoreCase("1"));
+ }
+
+ @Test
+ public void testSelect(){
+ RandomSelector<Integer> selector = new RandomSelector();
+ int result = selector.select(Arrays.asList(1,2,3,4,5,6,7));
+ Assert.assertTrue(result >= 1 && result <= 7);
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
new file mode 100644
index 0000000..a34e667
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * round robin selector
+ */
+public class RoundRobinSelectorTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSelectWithIllegalArgumentException(){
+ RoundRobinSelector selector = new RoundRobinSelector();
+ selector.select(Collections.EMPTY_LIST);
+ }
+
+ @Test
+ public void testSelect1(){
+ RoundRobinSelector<String> selector = new RoundRobinSelector();
+ String result = selector.select(Arrays.asList("1"));
+ Assert.assertTrue(StringUtils.isNotEmpty(result));
+ Assert.assertTrue(result.equalsIgnoreCase("1"));
+ }
+
+ @Test
+ public void testSelect(){
+ RoundRobinSelector<Integer> selector = new RoundRobinSelector();
+ List<Integer> sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
+ int result = selector.select(sources);
+ Assert.assertTrue(result == 1);
+ int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7));
+ Assert.assertTrue(result2 == 2);
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
new file mode 100644
index 0000000..5f44e1c
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.Date;
+
+/**
+ * test task call back service
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class,
+ ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
+ ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class})
+public class TaskCallbackServiceTest {
+
+ @Autowired
+ private TaskCallbackService taskCallbackService;
+
+ @Autowired
+ private MasterRegistry masterRegistry;
+
+ @Test
+ public void testSendAck(){
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(30000);
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.start();
+
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+ taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+ TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+ ackCommand.setTaskInstanceId(1);
+ ackCommand.setStartTime(new Date());
+ taskCallbackService.sendAck(1, ackCommand.convert2Command());
+
+ nettyRemotingServer.close();
+ nettyRemotingClient.close();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSendAckWithIllegalArgumentException(){
+ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
+ taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSendAckWithIllegalStateException1(){
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(30000);
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.start();
+
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+ taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+ channel.close();
+ TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+ ackCommand.setTaskInstanceId(1);
+ ackCommand.setStartTime(new Date());
+
+ nettyRemotingServer.close();
+ taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSendAckWithIllegalStateException2(){
+ masterRegistry.registry();
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(30000);
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.start();
+
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+ taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+ channel.close();
+ TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+ ackCommand.setTaskInstanceId(1);
+ ackCommand.setStartTime(new Date());
+
+ nettyRemotingServer.close();
+ taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
new file mode 100644
index 0000000..e6dd8e7
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.mockito.Mockito;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * dependency config
+ */
+@Configuration
+public class TaskCallbackServiceTestConfig {
+
+
+ @Bean
+ public AlertDao alertDao() {
+ return new AlertDao();
+ }
+
+ @Bean
+ public AlertMapper alertMapper() {
+ return Mockito.mock(AlertMapper.class);
+ }
+
+ @Bean
+ public UserAlertGroupMapper userAlertGroupMapper() {
+ return Mockito.mock(UserAlertGroupMapper.class);
+ }
+
+ @Bean
+ public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){
+ return Mockito.mock(TaskInstanceCacheManagerImpl.class);
+ }
+
+ @Bean
+ public ProcessService processService(){
+ return Mockito.mock(ProcessService.class);
+ }
+
+ @Bean
+ public UserMapper userMapper(){
+ return Mockito.mock(UserMapper.class);
+ }
+
+ @Bean
+ public ProcessDefinitionMapper processDefineMapper(){
+ return Mockito.mock(ProcessDefinitionMapper.class);
+ }
+
+ @Bean
+ public ProcessInstanceMapper processInstanceMapper(){
+ return Mockito.mock(ProcessInstanceMapper.class);
+ }
+
+ @Bean
+ public DataSourceMapper dataSourceMapper(){
+ return Mockito.mock(DataSourceMapper.class);
+ }
+
+ @Bean
+ public ProcessInstanceMapMapper processInstanceMapMapper(){
+ return Mockito.mock(ProcessInstanceMapMapper.class);
+ }
+
+ @Bean
+ public TaskInstanceMapper taskInstanceMapper(){
+ return Mockito.mock(TaskInstanceMapper.class);
+ }
+
+ @Bean
+ public CommandMapper commandMapper(){
+ return Mockito.mock(CommandMapper.class);
+ }
+
+ @Bean
+ public ScheduleMapper scheduleMapper(){
+ return Mockito.mock(ScheduleMapper.class);
+ }
+
+ @Bean
+ public UdfFuncMapper udfFuncMapper(){
+ return Mockito.mock(UdfFuncMapper.class);
+ }
+
+ @Bean
+ public ResourceMapper resourceMapper(){
+ return Mockito.mock(ResourceMapper.class);
+ }
+
+ @Bean
+ public WorkerGroupMapper workerGroupMapper(){
+ return Mockito.mock(WorkerGroupMapper.class);
+ }
+
+ @Bean
+ public ErrorCommandMapper errorCommandMapper(){
+ return Mockito.mock(ErrorCommandMapper.class);
+ }
+
+ @Bean
+ public TenantMapper tenantMapper(){
+ return Mockito.mock(TenantMapper.class);
+ }
+
+ @Bean
+ public ProjectMapper projectMapper(){
+ return Mockito.mock(ProjectMapper.class);
+ }
+
+}