You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/10/23 04:40:03 UTC
[shardingsphere-elasticjob] branch master updated: Move
OneOffJobBootstrap.triggerAllInstances into InstanceService (#1636)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 32db930 Move OneOffJobBootstrap.triggerAllInstances into InstanceService (#1636)
32db930 is described below
commit 32db930857dbea3fa20bb1ed0f3a075323519e4b
Author: wwj <22...@qq.com>
AuthorDate: Fri Oct 23 12:37:28 2020 +0800
Move OneOffJobBootstrap.triggerAllInstances into InstanceService (#1636)
* Move OneOffJobBootstrap.triggerAllInstances into InstanceService (#1610).
* use lambda
* correct review problem
---
.../api/bootstrap/impl/OneOffJobBootstrap.java | 20 ++--
.../lite/internal/instance/InstanceNode.java | 8 +-
.../lite/internal/instance/InstanceService.java | 23 ++---
.../api/bootstrap/impl/OneOffJobBootstrapTest.java | 114 +++++++++++++++++++++
.../lite/internal/instance/InstanceNodeTest.java | 7 +-
.../internal/instance/InstanceServiceTest.java | 12 ++-
6 files changed, 155 insertions(+), 29 deletions(-)
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
index 9f7ab57..a98a33e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrap.java
@@ -22,9 +22,8 @@ import com.google.common.base.Strings;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceOperation;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
-import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
/**
@@ -34,28 +33,25 @@ public final class OneOffJobBootstrap implements JobBootstrap {
private final JobScheduler jobScheduler;
+ private final InstanceService instanceService;
+
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
+ Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
+ instanceService = new InstanceService(regCenter, jobConfig.getJobName());
}
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig) {
+ Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);
+ instanceService = new InstanceService(regCenter, jobConfig.getJobName());
}
/**
* Execute job.
*/
public void execute() {
- Preconditions.checkArgument(Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron should be empty.");
- triggerAllInstances();
- }
-
- private void triggerAllInstances() {
- CoordinatorRegistryCenter regCenter = jobScheduler.getRegCenter();
- JobNodePath jobNodePath = new JobNodePath(jobScheduler.getJobConfig().getJobName());
- for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
- regCenter.persist(jobNodePath.getInstanceNodePath(each), InstanceOperation.TRIGGER.name());
- }
+ instanceService.triggerAllInstances();
}
@Override
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
index 13e569b..b26c586 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
@@ -61,7 +61,11 @@ public final class InstanceNode {
return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())));
}
- String getLocalInstanceNode() {
- return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+ String getLocalInstancePath() {
+ return getInstancePath(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+ }
+
+ String getInstancePath(final String instanceId) {
+ return String.format(INSTANCES, instanceId);
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
index 36c6ddd..315d670 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
@@ -46,21 +46,18 @@ public final class InstanceService {
* Persist job online status.
*/
public void persistOnline() {
- jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
+ jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), "");
}
/**
* Persist job instance.
*/
public void removeInstance() {
- jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstanceNode());
+ jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstancePath());
}
- /**
- * Clear trigger flag.
- */
- public void clearTriggerFlag() {
- jobNodeStorage.updateJobNode(instanceNode.getLocalInstanceNode(), "");
+ void clearTriggerFlag() {
+ jobNodeStorage.updateJobNode(instanceNode.getLocalInstancePath(), "");
}
/**
@@ -79,12 +76,14 @@ public final class InstanceService {
return result;
}
+ boolean isLocalJobInstanceExisted() {
+ return jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstancePath());
+ }
+
/**
- * Judge is job instance existed or not in localhost.
- *
- * @return is job instance existed or not in localhost
+ * Trigger all instances.
*/
- public boolean isLocalJobInstanceExisted() {
- return jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstanceNode());
+ public void triggerAllInstances() {
+ jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT).forEach(each -> jobNodeStorage.replaceJobNode(instanceNode.getInstancePath(each), InstanceOperation.TRIGGER.name()));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java
new file mode 100644
index 0000000..f34b2c7
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class OneOffJobBootstrapTest {
+
+ private static final ZookeeperConfiguration ZOOKEEPER_CONFIGURATION = new ZookeeperConfiguration(EmbedTestingServer.getConnectionString(), OneOffJobBootstrapTest.class.getSimpleName());
+
+ private static final int SHARDING_TOTAL_COUNT = 3;
+
+ private ZookeeperRegistryCenter zkRegCenter;
+
+ @BeforeClass
+ public static void init() {
+ EmbedTestingServer.start();
+ }
+
+ @Before
+ public void setUp() {
+ zkRegCenter = new ZookeeperRegistryCenter(ZOOKEEPER_CONFIGURATION);
+ zkRegCenter.init();
+ }
+
+ @After
+ public void teardown() {
+ zkRegCenter.close();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertConfigFailedWithCron() {
+ new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+ }, JobConfiguration.newBuilder("test_one_off_job_execute_with_config_cron", SHARDING_TOTAL_COUNT).cron("0/5 * * * * ?").build());
+ }
+
+ @Test
+ public void assertExecute() {
+ AtomicInteger counter = new AtomicInteger(0);
+ final OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+ counter.incrementAndGet();
+ }, JobConfiguration.newBuilder("test_one_off_job_execute", SHARDING_TOTAL_COUNT).build());
+ oneOffJobBootstrap.execute();
+ blockUtilFinish(oneOffJobBootstrap);
+ assertThat(counter.get(), is(SHARDING_TOTAL_COUNT));
+ getJobScheduler(oneOffJobBootstrap).shutdown();
+ }
+
+ @Test
+ public void assertShutdown() throws SchedulerException {
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
+ }, JobConfiguration.newBuilder("test_one_off_job_shutdown", SHARDING_TOTAL_COUNT).build());
+ oneOffJobBootstrap.shutdown();
+ assertTrue(getScheduler(oneOffJobBootstrap).isShutdown());
+ }
+
+ @SneakyThrows
+ private JobScheduler getJobScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {
+ Field field = OneOffJobBootstrap.class.getDeclaredField("jobScheduler");
+ field.setAccessible(true);
+ return (JobScheduler) field.get(oneOffJobBootstrap);
+ }
+
+ @SneakyThrows
+ private Scheduler getScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {
+ JobScheduler jobScheduler = getJobScheduler(oneOffJobBootstrap);
+ Field schedulerField = JobScheduleController.class.getDeclaredField("scheduler");
+ schedulerField.setAccessible(true);
+ return (Scheduler) schedulerField.get(jobScheduler.getJobScheduleController());
+ }
+
+ @SneakyThrows
+ private void blockUtilFinish(final OneOffJobBootstrap oneOffJobBootstrap) {
+ Scheduler scheduler = getScheduler(oneOffJobBootstrap);
+ while (!scheduler.isStarted() || !scheduler.getCurrentlyExecutingJobs().isEmpty()) {
+ Thread.sleep(100);
+ }
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
index cac04bd..cf94637 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNodeTest.java
@@ -64,6 +64,11 @@ public final class InstanceNodeTest {
@Test
public void assertGetLocalInstancePath() {
- assertThat(instanceNode.getLocalInstanceNode(), is("instances/127.0.0.1@-@0"));
+ assertThat(instanceNode.getLocalInstancePath(), is("instances/127.0.0.1@-@0"));
+ }
+
+ @Test
+ public void assertGetInstancePath() {
+ assertThat(instanceNode.getInstancePath("127.0.0.1@-@0"), is("instances/127.0.0.1@-@0"));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
index 4092ad8..e5030f6 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
@@ -73,12 +73,12 @@ public final class InstanceServiceTest {
@Test
public void assertClearTriggerFlag() {
instanceService.clearTriggerFlag();
- jobNodeStorage.updateJobNode("instances/127.0.0.1@-@0", "");
+ verify(jobNodeStorage).updateJobNode("instances/127.0.0.1@-@0", "");
}
@Test
public void assertGetAvailableJobInstances() {
- when(jobNodeStorage.getJobNodeChildrenKeys("instances")).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
+ when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
assertThat(instanceService.getAvailableJobInstances(), is(Collections.singletonList(new JobInstance("127.0.0.1@-@0"))));
}
@@ -88,4 +88,12 @@ public final class InstanceServiceTest {
when(jobNodeStorage.isJobNodeExisted("instances/127.0.0.1@-@0")).thenReturn(true);
assertTrue(instanceService.isLocalJobInstanceExisted());
}
+
+ @Test
+ public void assertTriggerAllInstances() {
+ when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
+ instanceService.triggerAllInstances();
+ verify(jobNodeStorage).replaceJobNode("instances/127.0.0.1@-@0", InstanceOperation.TRIGGER.name());
+ verify(jobNodeStorage).replaceJobNode("instances/127.0.0.2@-@0", InstanceOperation.TRIGGER.name());
+ }
}