You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/24 02:45:15 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 1e5a114  add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)
1e5a114 is described below

commit 1e5a1140ab39c8edbaef2eaf79357d332141c6c0
Author: Tboy <gu...@immomo.com>
AuthorDate: Tue Mar 24 10:45:04 2020 +0800

    add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest (#2291)
    
    * let quartz use the same datasource
    
    * move master/worker config from dao.properties to each config
    add master/worker registry test
    
    * move mybatis config from application.properties to SpringConnectionFactory
    
    * move mybatis-plus config from application.properties to SpringConnectionFactory
    
    * refactor TaskCallbackService
    
    * add ZookeeperNodeManagerTest
    
    * add NettyExecutorManagerTest
    
    * refactor TaskKillProcessor
    
    * add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest
---
 .../worker/processor/TaskCallbackService.java      |   5 +-
 .../master/dispatch/host/RandomSelectorTest.java   |  52 +++++++++
 .../dispatch/host/RoundRobinSelectorTest.java      |  56 +++++++++
 .../worker/processor/TaskCallbackServiceTest.java  | 130 +++++++++++++++++++++
 .../processor/TaskCallbackServiceTestConfig.java   | 130 +++++++++++++++++++++
 5 files changed, 372 insertions(+), 1 deletion(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index a508177..f966591 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -79,8 +79,11 @@ public class TaskCallbackService {
      * @param taskInstanceId taskInstanceId
      * @return callback channel
      */
-    public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+    private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
         NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
+        if(nettyRemoteChannel == null){
+            throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
+        }
         if(nettyRemoteChannel.isActive()){
             return nettyRemoteChannel;
         }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
new file mode 100644
index 0000000..1d7e03e
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * random selector
+ */
+public class RandomSelectorTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSelectWithIllegalArgumentException(){
+        RandomSelector selector = new RandomSelector();
+        selector.select(Collections.EMPTY_LIST);
+    }
+
+    @Test
+    public void testSelect1(){
+        RandomSelector<String> selector = new RandomSelector();
+        String result = selector.select(Arrays.asList("1"));
+        Assert.assertTrue(StringUtils.isNotEmpty(result));
+        Assert.assertTrue(result.equalsIgnoreCase("1"));
+    }
+
+    @Test
+    public void testSelect(){
+        RandomSelector<Integer> selector = new RandomSelector();
+        int result = selector.select(Arrays.asList(1,2,3,4,5,6,7));
+        Assert.assertTrue(result >= 1 && result <= 7);
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
new file mode 100644
index 0000000..a34e667
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * round robin selector
+ */
+public class RoundRobinSelectorTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSelectWithIllegalArgumentException(){
+        RoundRobinSelector selector = new RoundRobinSelector();
+        selector.select(Collections.EMPTY_LIST);
+    }
+
+    @Test
+    public void testSelect1(){
+        RoundRobinSelector<String> selector = new RoundRobinSelector();
+        String result = selector.select(Arrays.asList("1"));
+        Assert.assertTrue(StringUtils.isNotEmpty(result));
+        Assert.assertTrue(result.equalsIgnoreCase("1"));
+    }
+
+    @Test
+    public void testSelect(){
+        RoundRobinSelector<Integer> selector = new RoundRobinSelector();
+        List<Integer> sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
+        int result = selector.select(sources);
+        Assert.assertTrue(result == 1);
+        int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7));
+        Assert.assertTrue(result2 == 2);
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
new file mode 100644
index 0000000..5f44e1c
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.Date;
+
+/**
+ * test task call back service
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class,
+        ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
+        ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class})
+public class TaskCallbackServiceTest {
+
+    @Autowired
+    private TaskCallbackService taskCallbackService;
+
+    @Autowired
+    private MasterRegistry masterRegistry;
+
+    @Test
+    public void testSendAck(){
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(30000);
+        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.start();
+
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+        taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+        ackCommand.setTaskInstanceId(1);
+        ackCommand.setStartTime(new Date());
+        taskCallbackService.sendAck(1, ackCommand.convert2Command());
+
+        nettyRemotingServer.close();
+        nettyRemotingClient.close();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSendAckWithIllegalArgumentException(){
+        TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
+        taskCallbackService.sendAck(1, ackCommand.convert2Command());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSendAckWithIllegalStateException1(){
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(30000);
+        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.start();
+
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+        taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+        channel.close();
+        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+        ackCommand.setTaskInstanceId(1);
+        ackCommand.setStartTime(new Date());
+
+        nettyRemotingServer.close();
+        taskCallbackService.sendAck(1, ackCommand.convert2Command());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSendAckWithIllegalStateException2(){
+        masterRegistry.registry();
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(30000);
+        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.start();
+
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+        taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
+        channel.close();
+        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
+        ackCommand.setTaskInstanceId(1);
+        ackCommand.setStartTime(new Date());
+
+        nettyRemotingServer.close();
+        taskCallbackService.sendAck(1, ackCommand.convert2Command());
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
new file mode 100644
index 0000000..e6dd8e7
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.mockito.Mockito;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * dependency config
+ */
+@Configuration
+public class TaskCallbackServiceTestConfig {
+
+
+    @Bean
+    public AlertDao alertDao() {
+        return new AlertDao();
+    }
+
+    @Bean
+    public AlertMapper alertMapper() {
+        return Mockito.mock(AlertMapper.class);
+    }
+
+    @Bean
+    public UserAlertGroupMapper userAlertGroupMapper() {
+        return Mockito.mock(UserAlertGroupMapper.class);
+    }
+
+    @Bean
+    public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){
+        return Mockito.mock(TaskInstanceCacheManagerImpl.class);
+    }
+
+    @Bean
+    public ProcessService processService(){
+        return Mockito.mock(ProcessService.class);
+    }
+
+    @Bean
+    public UserMapper userMapper(){
+        return Mockito.mock(UserMapper.class);
+    }
+
+    @Bean
+    public ProcessDefinitionMapper processDefineMapper(){
+        return Mockito.mock(ProcessDefinitionMapper.class);
+    }
+
+    @Bean
+    public ProcessInstanceMapper processInstanceMapper(){
+        return Mockito.mock(ProcessInstanceMapper.class);
+    }
+
+    @Bean
+    public DataSourceMapper dataSourceMapper(){
+        return Mockito.mock(DataSourceMapper.class);
+    }
+
+    @Bean
+    public ProcessInstanceMapMapper processInstanceMapMapper(){
+        return Mockito.mock(ProcessInstanceMapMapper.class);
+    }
+
+    @Bean
+    public TaskInstanceMapper taskInstanceMapper(){
+        return Mockito.mock(TaskInstanceMapper.class);
+    }
+
+    @Bean
+    public CommandMapper commandMapper(){
+        return Mockito.mock(CommandMapper.class);
+    }
+
+    @Bean
+    public ScheduleMapper scheduleMapper(){
+        return Mockito.mock(ScheduleMapper.class);
+    }
+
+    @Bean
+    public UdfFuncMapper udfFuncMapper(){
+        return Mockito.mock(UdfFuncMapper.class);
+    }
+
+    @Bean
+    public ResourceMapper resourceMapper(){
+        return Mockito.mock(ResourceMapper.class);
+    }
+
+    @Bean
+    public WorkerGroupMapper workerGroupMapper(){
+        return Mockito.mock(WorkerGroupMapper.class);
+    }
+
+    @Bean
+    public ErrorCommandMapper errorCommandMapper(){
+        return Mockito.mock(ErrorCommandMapper.class);
+    }
+
+    @Bean
+    public TenantMapper tenantMapper(){
+        return Mockito.mock(TenantMapper.class);
+    }
+
+    @Bean
+    public ProjectMapper projectMapper(){
+        return Mockito.mock(ProjectMapper.class);
+    }
+
+}