You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/10/23 04:40:03 UTC

[shardingsphere-elasticjob] branch master updated: Move OneOffJobBootstrap.triggerAllInstances into InstanceService (#1636)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 32db930  Move OneOffJobBootstrap.triggerAllInstances into InstanceService  (#1636)
32db930 is described below

commit 32db930857dbea3fa20bb1ed0f3a075323519e4b
Author: wwj <22...@qq.com>
AuthorDate: Fri Oct 23 12:37:28 2020 +0800

    Move OneOffJobBootstrap.triggerAllInstances into InstanceService  (#1636)
    
    * Move OneOffJobBootstrap.triggerAllInstances into InstanceService (#1610).
    
    * use lambda
    
    * correct review problem
---
 .../api/bootstrap/impl/OneOffJobBootstrap.java     |  20 ++--
 .../lite/internal/instance/InstanceNode.java       |   8 +-
 .../lite/internal/instance/InstanceService.java    |  23 ++---
 .../api/bootstrap/impl/OneOffJobBootstrapTest.java | 114 +++++++++++++++++++++
 .../lite/internal/instance/InstanceNodeTest.java   |   7 +-
 .../internal/instance/InstanceServiceTest.java     |  12 ++-
 6 files changed, 155 insertions(+), 29 deletions(-)

diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
index 9f7ab57..a98a33e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
@@ -22,9 +22,8 @@ import com.google.common.base.Strings;
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceOperation;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
-import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 
 /**
@@ -34,28 +33,25 @@ public final class OneOffJobBootstrap implements JobBootstrap {
     
     private final JobScheduler jobScheduler;
     
+    private final InstanceService instanceService;
+
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
+        Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
         jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
+        instanceService = new InstanceService(regCenter, jobConfig.getJobName());
     }
     
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig) {
+        Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
         jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);
+        instanceService = new InstanceService(regCenter, jobConfig.getJobName());
     }
     
     /**
      * Execute job.
      */
     public void execute() {
-        Preconditions.checkArgument(Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron should be empty.");
-        triggerAllInstances();
-    }
-    
-    private void triggerAllInstances() {
-        CoordinatorRegistryCenter regCenter = jobScheduler.getRegCenter();
-        JobNodePath jobNodePath = new JobNodePath(jobScheduler.getJobConfig().getJobName());
-        for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
-            regCenter.persist(jobNodePath.getInstanceNodePath(each), InstanceOperation.TRIGGER.name());
-        }
+        instanceService.triggerAllInstances();
     }
     
     @Override
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
index 13e569b..b26c586 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
@@ -61,7 +61,11 @@ public final class InstanceNode {
         return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())));
     }
     
-    String getLocalInstanceNode() {
-        return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+    String getLocalInstancePath() {
+        return getInstancePath(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+    }
+
+    String getInstancePath(final String instanceId) {
+        return String.format(INSTANCES, instanceId);
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
index 36c6ddd..315d670 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
@@ -46,21 +46,18 @@ public final class InstanceService {
      * Persist job online status.
      */
     public void persistOnline() {
-        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
+        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), "");
     }
     
     /**
      * Persist job instance.
      */
     public void removeInstance() {
-        jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstanceNode());
+        jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstancePath());
     }
     
-    /**
-     * Clear trigger flag.
-     */
-    public void clearTriggerFlag() {
-        jobNodeStorage.updateJobNode(instanceNode.getLocalInstanceNode(), "");
+    void clearTriggerFlag() {
+        jobNodeStorage.updateJobNode(instanceNode.getLocalInstancePath(), "");
     }
     
     /**
@@ -79,12 +76,14 @@ public final class InstanceService {
         return result;
     }
     
+    boolean isLocalJobInstanceExisted() {
+        return jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstancePath());
+    }
+
     /**
-     * Judge is job instance existed or not in localhost.
-     * 
-     * @return is job instance existed or not in localhost
+     * Trigger all instances.
      */
-    public boolean isLocalJobInstanceExisted() {
-        return jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstanceNode());
+    public void triggerAllInstances() {
+        jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT).forEach(each -> jobNodeStorage.replaceJobNode(instanceNode.getInstancePath(each), InstanceOperation.TRIGGER.name()));
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java
new file mode 100644
index 0000000..f34b2c7
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class OneOffJobBootstrapTest {
+
+    private static final ZookeeperConfiguration ZOOKEEPER_CONFIGURATION = new ZookeeperConfiguration(EmbedTestingServer.getConnectionString(), OneOffJobBootstrapTest.class.getSimpleName());
+
+    private static final int SHARDING_TOTAL_COUNT = 3;
+
+    private ZookeeperRegistryCenter zkRegCenter;
+
+    @BeforeClass
+    public static void init() {
+        EmbedTestingServer.start();
+    }
+
+    @Before
+    public void setUp() {
+        zkRegCenter = new ZookeeperRegistryCenter(ZOOKEEPER_CONFIGURATION);
+        zkRegCenter.init();
+    }
+
+    @After
+    public void teardown() {
+        zkRegCenter.close();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void assertConfigFailedWithCron() {
+        new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+        }, JobConfiguration.newBuilder("test_one_off_job_execute_with_config_cron", SHARDING_TOTAL_COUNT).cron("0/5 * * * * ?").build());
+    }
+
+    @Test
+    public void assertExecute() {
+        AtomicInteger counter = new AtomicInteger(0);
+        final OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+            counter.incrementAndGet();
+        }, JobConfiguration.newBuilder("test_one_off_job_execute", SHARDING_TOTAL_COUNT).build());
+        oneOffJobBootstrap.execute();
+        blockUtilFinish(oneOffJobBootstrap);
+        assertThat(counter.get(), is(SHARDING_TOTAL_COUNT));
+        getJobScheduler(oneOffJobBootstrap).shutdown();
+    }
+
+    @Test
+    public void assertShutdown() throws SchedulerException {
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+        }, JobConfiguration.newBuilder("test_one_off_job_shutdown", SHARDING_TOTAL_COUNT).build());
+        oneOffJobBootstrap.shutdown();
+        assertTrue(getScheduler(oneOffJobBootstrap).isShutdown());
+    }
+
+    @SneakyThrows
+    private JobScheduler getJobScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {
+        Field field = OneOffJobBootstrap.class.getDeclaredField("jobScheduler");
+        field.setAccessible(true);
+        return (JobScheduler) field.get(oneOffJobBootstrap);
+    }
+
+    @SneakyThrows
+    private Scheduler getScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {
+        JobScheduler jobScheduler = getJobScheduler(oneOffJobBootstrap);
+        Field schedulerField = JobScheduleController.class.getDeclaredField("scheduler");
+        schedulerField.setAccessible(true);
+        return (Scheduler) schedulerField.get(jobScheduler.getJobScheduleController());
+    }
+
+    @SneakyThrows
+    private void blockUtilFinish(final OneOffJobBootstrap oneOffJobBootstrap) {
+        Scheduler scheduler = getScheduler(oneOffJobBootstrap);
+        while (!scheduler.isStarted() || !scheduler.getCurrentlyExecutingJobs().isEmpty()) {
+            Thread.sleep(100);
+        }
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
index cac04bd..cf94637 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
@@ -64,6 +64,11 @@ public final class InstanceNodeTest {
     
     @Test
     public void assertGetLocalInstancePath() {
-        assertThat(instanceNode.getLocalInstanceNode(), is("instances/127.0.0.1@-@0"));
+        assertThat(instanceNode.getLocalInstancePath(), is("instances/127.0.0.1@-@0"));
+    }
+
+    @Test
+    public void assertGetInstancePath() {
+        assertThat(instanceNode.getInstancePath("127.0.0.1@-@0"), is("instances/127.0.0.1@-@0"));
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
index 4092ad8..e5030f6 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
@@ -73,12 +73,12 @@ public final class InstanceServiceTest {
     @Test
     public void assertClearTriggerFlag() {
         instanceService.clearTriggerFlag();
-        jobNodeStorage.updateJobNode("instances/127.0.0.1@-@0", "");
+        verify(jobNodeStorage).updateJobNode("instances/127.0.0.1@-@0", "");
     }
     
     @Test
     public void assertGetAvailableJobInstances() {
-        when(jobNodeStorage.getJobNodeChildrenKeys("instances")).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
+        when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
         when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
         assertThat(instanceService.getAvailableJobInstances(), is(Collections.singletonList(new JobInstance("127.0.0.1@-@0"))));
     }
@@ -88,4 +88,12 @@ public final class InstanceServiceTest {
         when(jobNodeStorage.isJobNodeExisted("instances/127.0.0.1@-@0")).thenReturn(true);
         assertTrue(instanceService.isLocalJobInstanceExisted());
     }
+
+    @Test
+    public void assertTriggerAllInstances() {
+        when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
+        instanceService.triggerAllInstances();
+        verify(jobNodeStorage).replaceJobNode("instances/127.0.0.1@-@0", InstanceOperation.TRIGGER.name());
+        verify(jobNodeStorage).replaceJobNode("instances/127.0.0.2@-@0", InstanceOperation.TRIGGER.name());
+    }
 }