You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/15 04:40:36 UTC

[shardingsphere-elasticjob] branch master updated: Trigger one-off job via Zookeeper (#1457)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d87e3  Trigger one-off job via Zookeeper (#1457)
f3d87e3 is described below

commit f3d87e322fb481f4dca69750a4fb52b525ebf029
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Tue Sep 15 12:40:27 2020 +0800

    Trigger one-off job via Zookeeper (#1457)
    
    * Trigger one-off job via Zookeeper
    
    * Refactor trigger method name
---
 .../api/bootstrap/impl/OneOffJobBootstrap.java     | 12 +++++++-
 .../internal/schedule/JobScheduleController.java   | 34 +++++++++-------------
 .../lite/internal/schedule/JobScheduler.java       |  1 +
 .../schedule/JobScheduleControllerTest.java        | 19 ++++++++++--
 4 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
index 9749d4a..84737c5 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
@@ -23,6 +23,8 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceOperation;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
 import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
@@ -57,7 +59,15 @@ public final class OneOffJobBootstrap implements JobBootstrap {
      */
     public void execute() {
         Preconditions.checkArgument(Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron should be empty.");
-        jobScheduler.getJobScheduleController().executeJob();
+        triggerAllInstances();
+    }
+    
+    private void triggerAllInstances() {
+        CoordinatorRegistryCenter regCenter = jobScheduler.getRegCenter();
+        JobNodePath jobNodePath = new JobNodePath(jobScheduler.getJobConfig().getJobName());
+        for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+            regCenter.persist(jobNodePath.getInstanceNodePath(each), InstanceOperation.TRIGGER.name());
+        }
     }
     
     @Override
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
index dcfbcac..7677675 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
@@ -43,26 +43,6 @@ public final class JobScheduleController {
     private final String triggerIdentity;
     
     /**
-     * Execute job.
-     */
-    public void executeJob() {
-        try {
-            if (!scheduler.checkExists(jobDetail.getKey())) {
-                scheduler.scheduleJob(jobDetail, createOneOffTrigger());
-                scheduler.start();
-            } else {
-                scheduler.triggerJob(jobDetail.getKey());
-            }
-        } catch (final SchedulerException ex) {
-            throw new JobSystemException(ex);
-        }
-    }
-    
-    private Trigger createOneOffTrigger() {
-        return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(SimpleScheduleBuilder.simpleSchedule()).build();
-    }
-    
-    /**
      * Schedule job.
      * 
      * @param cron CRON expression
@@ -156,14 +136,26 @@ public final class JobScheduleController {
      */
     public synchronized void triggerJob() {
         try {
-            if (!scheduler.isShutdown()) {
+            if (scheduler.isShutdown()) {
+                return;
+            }
+            if (!scheduler.checkExists(jobDetail.getKey())) {
+                scheduler.scheduleJob(jobDetail, createOneOffTrigger());
+            } else {
                 scheduler.triggerJob(jobDetail.getKey());
             }
+            if (!scheduler.isStarted()) {
+                scheduler.start();
+            }
         } catch (final SchedulerException ex) {
             throw new JobSystemException(ex);
         }
     }
     
+    private Trigger createOneOffTrigger() {
+        return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(SimpleScheduleBuilder.simpleSchedule()).build();
+    }
+    
     /**
      * Shutdown scheduler.
      */
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 64b40d9..89575e9 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -48,6 +48,7 @@ public final class JobScheduler {
     
     private static final String JOB_EXECUTOR_DATA_MAP_KEY = "jobExecutor";
     
+    @Getter
     private final CoordinatorRegistryCenter regCenter;
     
     private final ElasticJob elasticJob;
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
index 4c6803d..f6b4b45 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
@@ -156,13 +156,14 @@ public final class JobScheduleControllerTest {
     public void assertTriggerJobFailure() throws SchedulerException {
         JobKey jobKey = new JobKey("test_job");
         when(jobDetail.getKey()).thenReturn(jobKey);
+        when(scheduler.checkExists(jobKey)).thenReturn(true);
         doThrow(SchedulerException.class).when(scheduler).triggerJob(jobKey);
         ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
         ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
         try {
             jobScheduleController.triggerJob();
         } finally {
-            verify(jobDetail).getKey();
+            verify(jobDetail, times(2)).getKey();
             verify(scheduler).triggerJob(jobKey);
         }
     }
@@ -171,14 +172,28 @@ public final class JobScheduleControllerTest {
     public void assertTriggerJobSuccess() throws SchedulerException {
         JobKey jobKey = new JobKey("test_job");
         when(jobDetail.getKey()).thenReturn(jobKey);
+        when(scheduler.checkExists(any(JobKey.class))).thenReturn(true);
         ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
         ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
         jobScheduleController.triggerJob();
-        verify(jobDetail).getKey();
+        verify(jobDetail, times(2)).getKey();
         verify(scheduler).triggerJob(jobKey);
     }
     
     @Test
+    public void assertTriggerOneOffJobSuccess() throws SchedulerException {
+        JobKey jobKey = new JobKey("test_job");
+        when(jobDetail.getKey()).thenReturn(jobKey);
+        when(scheduler.checkExists(jobDetail.getKey())).thenReturn(false);
+        ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
+        ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
+        jobScheduleController.triggerJob();
+        verify(jobDetail, times(2)).getKey();
+        verify(scheduler).scheduleJob(eq(jobDetail), any(Trigger.class));
+        verify(scheduler).start();
+    }
+    
+    @Test
     public void assertShutdownJobIfShutdown() throws SchedulerException {
         ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
         when(scheduler.isShutdown()).thenReturn(true);