You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/15 04:40:36 UTC
[shardingsphere-elasticjob] branch master updated: Trigger one-off
job via Zookeeper (#1457)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new f3d87e3 Trigger one-off job via Zookeeper (#1457)
f3d87e3 is described below
commit f3d87e322fb481f4dca69750a4fb52b525ebf029
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Tue Sep 15 12:40:27 2020 +0800
Trigger one-off job via Zookeeper (#1457)
* Trigger one-off job via Zookeeper
* Refactor trigger method name
---
.../api/bootstrap/impl/OneOffJobBootstrap.java | 12 +++++++-
.../internal/schedule/JobScheduleController.java | 34 +++++++++-------------
.../lite/internal/schedule/JobScheduler.java | 1 +
.../schedule/JobScheduleControllerTest.java | 19 ++++++++++--
4 files changed, 42 insertions(+), 24 deletions(-)
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
index 9749d4a..84737c5 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
@@ -23,6 +23,8 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceOperation;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
@@ -57,7 +59,15 @@ public final class OneOffJobBootstrap implements JobBootstrap {
*/
public void execute() {
Preconditions.checkArgument(Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron should be empty.");
- jobScheduler.getJobScheduleController().executeJob();
+ triggerAllInstances();
+ }
+
+ private void triggerAllInstances() {
+ CoordinatorRegistryCenter regCenter = jobScheduler.getRegCenter();
+ JobNodePath jobNodePath = new JobNodePath(jobScheduler.getJobConfig().getJobName());
+ for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+ regCenter.persist(jobNodePath.getInstanceNodePath(each), InstanceOperation.TRIGGER.name());
+ }
}
@Override
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
index dcfbcac..7677675 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleController.java
@@ -43,26 +43,6 @@ public final class JobScheduleController {
private final String triggerIdentity;
/**
- * Execute job.
- */
- public void executeJob() {
- try {
- if (!scheduler.checkExists(jobDetail.getKey())) {
- scheduler.scheduleJob(jobDetail, createOneOffTrigger());
- scheduler.start();
- } else {
- scheduler.triggerJob(jobDetail.getKey());
- }
- } catch (final SchedulerException ex) {
- throw new JobSystemException(ex);
- }
- }
-
- private Trigger createOneOffTrigger() {
- return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(SimpleScheduleBuilder.simpleSchedule()).build();
- }
-
- /**
* Schedule job.
*
* @param cron CRON expression
@@ -156,14 +136,26 @@ public final class JobScheduleController {
*/
public synchronized void triggerJob() {
try {
- if (!scheduler.isShutdown()) {
+ if (scheduler.isShutdown()) {
+ return;
+ }
+ if (!scheduler.checkExists(jobDetail.getKey())) {
+ scheduler.scheduleJob(jobDetail, createOneOffTrigger());
+ } else {
scheduler.triggerJob(jobDetail.getKey());
}
+ if (!scheduler.isStarted()) {
+ scheduler.start();
+ }
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
+ private Trigger createOneOffTrigger() {
+ return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(SimpleScheduleBuilder.simpleSchedule()).build();
+ }
+
/**
* Shutdown scheduler.
*/
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 64b40d9..89575e9 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -48,6 +48,7 @@ public final class JobScheduler {
private static final String JOB_EXECUTOR_DATA_MAP_KEY = "jobExecutor";
+ @Getter
private final CoordinatorRegistryCenter regCenter;
private final ElasticJob elasticJob;
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
index 4c6803d..f6b4b45 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduleControllerTest.java
@@ -156,13 +156,14 @@ public final class JobScheduleControllerTest {
public void assertTriggerJobFailure() throws SchedulerException {
JobKey jobKey = new JobKey("test_job");
when(jobDetail.getKey()).thenReturn(jobKey);
+ when(scheduler.checkExists(jobKey)).thenReturn(true);
doThrow(SchedulerException.class).when(scheduler).triggerJob(jobKey);
ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
try {
jobScheduleController.triggerJob();
} finally {
- verify(jobDetail).getKey();
+ verify(jobDetail, times(2)).getKey();
verify(scheduler).triggerJob(jobKey);
}
}
@@ -171,14 +172,28 @@ public final class JobScheduleControllerTest {
public void assertTriggerJobSuccess() throws SchedulerException {
JobKey jobKey = new JobKey("test_job");
when(jobDetail.getKey()).thenReturn(jobKey);
+ when(scheduler.checkExists(any(JobKey.class))).thenReturn(true);
ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
jobScheduleController.triggerJob();
- verify(jobDetail).getKey();
+ verify(jobDetail, times(2)).getKey();
verify(scheduler).triggerJob(jobKey);
}
@Test
+ public void assertTriggerOneOffJobSuccess() throws SchedulerException {
+ JobKey jobKey = new JobKey("test_job");
+ when(jobDetail.getKey()).thenReturn(jobKey);
+ when(scheduler.checkExists(jobDetail.getKey())).thenReturn(false);
+ ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
+ ReflectionUtils.setFieldValue(jobScheduleController, "jobDetail", jobDetail);
+ jobScheduleController.triggerJob();
+ verify(jobDetail, times(2)).getKey();
+ verify(scheduler).scheduleJob(eq(jobDetail), any(Trigger.class));
+ verify(scheduler).start();
+ }
+
+ @Test
public void assertShutdownJobIfShutdown() throws SchedulerException {
ReflectionUtils.setFieldValue(jobScheduleController, "scheduler", scheduler);
when(scheduler.isShutdown()).thenReturn(true);