You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2023/02/28 01:31:16 UTC
[shardingsphere-elasticjob] branch master updated: Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener (#2187)
This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 19d158010 Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener (#2187)
19d158010 is described below
commit 19d1580102dba8498105737bb08bdb1c604e3970
Author: wizhuo <46...@users.noreply.github.com>
AuthorDate: Tue Feb 28 09:31:05 2023 +0800
Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener (#2187)
* Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener when the config overwrite is false
* adjust code style
* Remove unnecessary methods
---
.../lite/internal/schedule/JobScheduler.java | 30 +++++++++++++---------
.../lite/internal/setup/SetUpFacade.java | 16 ------------
.../lite/internal/setup/SetUpFacadeTest.java | 20 +--------------
3 files changed, 19 insertions(+), 47 deletions(-)
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 4629b3889..ffee2d7d8 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListenerFactory;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
@@ -78,32 +79,37 @@ public final class JobScheduler {
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
this.regCenter = regCenter;
- Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
- this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
- schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
+ this.jobConfig = setUpJobConfiguration(regCenter, jobClassName, jobConfig);
+ Collection<ElasticJobListener> jobListeners = getElasticJobListeners(this.jobConfig);
+ setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners);
+ schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName());
+ jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
jobScheduleController = createJobScheduleController();
}
-
+
public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(elasticJobType), "Elastic job type cannot be null or empty.");
this.regCenter = regCenter;
- Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
- this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig);
- schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
+ this.jobConfig = setUpJobConfiguration(regCenter, elasticJobType, jobConfig);
+ Collection<ElasticJobListener> jobListeners = getElasticJobListeners(this.jobConfig);
+ setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners);
+ schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName());
+ jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
jobScheduleController = createJobScheduleController();
}
-
+
+ private JobConfiguration setUpJobConfiguration(final CoordinatorRegistryCenter regCenter, final String jobClassName, final JobConfiguration jobConfig) {
+ ConfigurationService configService = new ConfigurationService(regCenter, jobConfig.getJobName());
+ return configService.setUpJobConfiguration(jobClassName, jobConfig);
+ }
+
private Collection<ElasticJobListener> getElasticJobListeners(final JobConfiguration jobConfig) {
return jobConfig.getJobListenerTypes().stream()
.map(type -> ElasticJobListenerFactory.createListener(type).orElseThrow(() -> new IllegalArgumentException(String.format("Can not find job listener type '%s'.", type))))
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index 05ed51ff1..0479be285 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.setup;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
-import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
@@ -34,8 +32,6 @@ import java.util.Collection;
*/
public final class SetUpFacade {
- private final ConfigurationService configService;
-
private final LeaderService leaderService;
private final ServerService serverService;
@@ -47,7 +43,6 @@ public final class SetUpFacade {
private final ListenerManager listenerManager;
public SetUpFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
- configService = new ConfigurationService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
@@ -55,17 +50,6 @@ public final class SetUpFacade {
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
- /**
- * Set up job configuration.
- *
- * @param jobClassName job class name
- * @param jobConfig job configuration to be updated
- * @return accepted job configuration
- */
- public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) {
- return configService.setUpJobConfiguration(jobClassName, jobConfig);
- }
-
/**
* Register start up info.
*
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
index 846a1c4e3..5cd521d25 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.setup;
-import org.apache.shardingsphere.elasticjob.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
-import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
@@ -36,17 +33,12 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class SetUpFacadeTest {
- @Mock
- private ConfigurationService configService;
-
@Mock
private LeaderService leaderService;
@@ -68,23 +60,13 @@ public final class SetUpFacadeTest {
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
setUpFacade = new SetUpFacade(null, "test_job", Collections.emptyList());
- ReflectionUtils.setFieldValue(setUpFacade, "configService", configService);
ReflectionUtils.setFieldValue(setUpFacade, "leaderService", leaderService);
ReflectionUtils.setFieldValue(setUpFacade, "serverService", serverService);
ReflectionUtils.setFieldValue(setUpFacade, "instanceService", instanceService);
ReflectionUtils.setFieldValue(setUpFacade, "reconcileService", reconcileService);
ReflectionUtils.setFieldValue(setUpFacade, "listenerManager", listenerManager);
}
-
- @Test
- public void assertSetUpJobConfiguration() {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("test_job", 3)
- .cron("0/1 * * * * ?").setProperty("streaming.process", Boolean.TRUE.toString()).build();
- when(configService.setUpJobConfiguration(ElasticJob.class.getName(), jobConfig)).thenReturn(jobConfig);
- assertThat(setUpFacade.setUpJobConfiguration(ElasticJob.class.getName(), jobConfig), is(jobConfig));
- verify(configService).setUpJobConfiguration(ElasticJob.class.getName(), jobConfig);
- }
-
+
@Test
public void assertRegisterStartUpInfo() {
setUpFacade.registerStartUpInfo(true);