You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/03 11:09:00 UTC
[shardingsphere-elasticjob-lite] branch master updated: Decouple
JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new f64f373 Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)
f64f373 is described below
commit f64f373b6efbb793cc11a16a568765b3c29e1d91
Author: Liang Zhang <te...@163.com>
AuthorDate: Fri Jul 3 19:08:53 2020 +0800
Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)
* Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap
* Move JobScheduler to scheduler package
* Fix test cases
* Fix test cases
---
.../lite/api/bootstrap/JobBootstrap.java | 149 +--------------------
.../lite/api/bootstrap/OneOffJobBootstrap.java | 20 ++-
.../lite/api/bootstrap/ScheduleJobBootstrap.java | 22 ++-
.../JobScheduler.java} | 36 ++---
.../lite/integrate/BaseIntegrateTest.java | 14 +-
.../lite/integrate/DisabledJobIntegrateTest.java | 4 +
...ingDataflowElasticJobForExecuteFailureTest.java | 56 --------
...lowElasticJobForExecuteThrowsExceptionTest.java | 56 --------
...ngDataflowElasticJobForMultipleThreadsTest.java | 56 --------
...reamingDataflowElasticJobForNotMonitorTest.java | 57 --------
10 files changed, 66 insertions(+), 404 deletions(-)
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
index 70624b9..db32fb5 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
@@ -17,150 +17,13 @@
package org.apache.shardingsphere.elasticjob.lite.api.bootstrap;
-import lombok.Getter;
-import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
-import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
-import org.apache.shardingsphere.elasticjob.lite.handler.sharding.JobInstance;
-import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobShutdownHookPlugin;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJob;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade;
-import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
-import org.apache.shardingsphere.elasticjob.lite.job.TypedJobFactory;
-import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.simpl.SimpleThreadPool;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
/**
- * Schedule job bootstrap.
+ * Job bootstrap.
*/
-public abstract class JobBootstrap {
-
- private static final String REG_CENTER_DATA_MAP_KEY = "regCenter";
-
- private static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
-
- private static final String JOB_CONFIG_DATA_MAP_KEY = "jobConfig";
-
- private static final String JOB_LISTENERS_DATA_MAP_KEY = "elasticJobListeners";
-
- private static final String TRACING_CONFIG_DATA_MAP_KEY = "tracingConfig";
-
- private final CoordinatorRegistryCenter regCenter;
-
- private final ElasticJob elasticJob;
-
- @Getter
- private final JobConfiguration jobConfig;
-
- private final List<ElasticJobListener> elasticJobListeners;
-
- private final TracingConfiguration tracingConfig;
-
- private final SetUpFacade setUpFacade;
-
- private final SchedulerFacade schedulerFacade;
-
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- this(regCenter, elasticJob, jobConfig, null, elasticJobListeners);
- }
-
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
- final ElasticJobListener... elasticJobListeners) {
- this.regCenter = regCenter;
- this.elasticJob = elasticJob;
- this.elasticJobListeners = Arrays.asList(elasticJobListeners);
- this.tracingConfig = tracingConfig;
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
- schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJob.getClass().getName(), jobConfig);
- setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
- }
-
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- this(regCenter, elasticJobType, jobConfig, null, elasticJobListeners);
- }
-
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
- final ElasticJobListener... elasticJobListeners) {
- this(regCenter, TypedJobFactory.createJobInstance(elasticJobType, jobConfig.getProps()), jobConfig, tracingConfig, elasticJobListeners);
- }
-
- private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
- GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName());
- for (ElasticJobListener each : elasticJobListeners) {
- if (each instanceof AbstractDistributeOnceElasticJobListener) {
- ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
- }
- }
- }
-
- protected final JobScheduleController createJobScheduleController() {
- JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
- JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
- registerStartUpInfo();
- return result;
- }
-
- private void registerStartUpInfo() {
- JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
- JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
- JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
- setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
- }
-
- private Scheduler createScheduler() {
- Scheduler result;
- try {
- StdSchedulerFactory factory = new StdSchedulerFactory();
- factory.initialize(getQuartzProps());
- result = factory.getScheduler();
- result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
- } catch (final SchedulerException ex) {
- throw new JobSystemException(ex);
- }
- return result;
- }
-
- private Properties getQuartzProps() {
- Properties result = new Properties();
- result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
- result.put("org.quartz.threadPool.threadCount", "1");
- result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
- result.put("org.quartz.jobStore.misfireThreshold", "1");
- result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
- result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
- return result;
- }
-
- private JobDetail createJobDetail() {
- JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
- result.getJobDataMap().put(REG_CENTER_DATA_MAP_KEY, regCenter);
- result.getJobDataMap().put(JOB_CONFIG_DATA_MAP_KEY, getJobConfig());
- result.getJobDataMap().put(JOB_LISTENERS_DATA_MAP_KEY, elasticJobListeners);
- result.getJobDataMap().put(TRACING_CONFIG_DATA_MAP_KEY, tracingConfig);
- result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
- return result;
- }
+public interface JobBootstrap {
- /**
- * Shutdown job.
- */
- public final void shutdown() {
- schedulerFacade.shutdownInstance();
- }
+ /**
+ * Shutdown job.
+ */
+ void shutdown();
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
index 053212a..1c1e840 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
@@ -21,35 +21,43 @@ import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.scheduler.JobScheduler;
import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
/**
* One off job bootstrap.
*/
-public final class OneOffJobBootstrap extends JobBootstrap {
+public final class OneOffJobBootstrap implements JobBootstrap {
+
+ private final JobScheduler jobScheduler;
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJob, jobConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, elasticJobListeners);
}
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
}
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJobType, jobConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, elasticJobListeners);
}
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
}
/**
* Execute job.
*/
public void execute() {
- createJobScheduleController().executeJob();
+ jobScheduler.getJobScheduleController().executeJob();
+ }
+
+ @Override
+ public void shutdown() {
+ jobScheduler.shutdown();
}
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
index 5cf47d3..1f3e2c4 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
@@ -23,36 +23,44 @@ import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.scheduler.JobScheduler;
import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
/**
* Schedule job bootstrap.
*/
-public final class ScheduleJobBootstrap extends JobBootstrap {
+public final class ScheduleJobBootstrap implements JobBootstrap {
+
+ private final JobScheduler jobScheduler;
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJob, jobConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, elasticJobListeners);
}
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
}
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJobType, jobConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, elasticJobListeners);
}
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
- super(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
+ jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
}
/**
* Schedule job.
*/
public void schedule() {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(getJobConfig().getCron()), "Cron can not be empty.");
- createJobScheduleController().scheduleJob(getJobConfig().getCron());
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron can not be empty.");
+ jobScheduler.getJobScheduleController().scheduleJob(jobScheduler.getJobConfig().getCron());
+ }
+
+ @Override
+ public void shutdown() {
+ jobScheduler.shutdown();
}
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
similarity index 92%
copy from elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
copy to elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
index 70624b9..188cb24 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.api.bootstrap;
+package org.apache.shardingsphere.elasticjob.lite.scheduler;
import lombok.Getter;
import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
@@ -46,9 +46,9 @@ import java.util.List;
import java.util.Properties;
/**
- * Schedule job bootstrap.
+ * Job scheduler.
*/
-public abstract class JobBootstrap {
+public final class JobScheduler {
private static final String REG_CENTER_DATA_MAP_KEY = "regCenter";
@@ -75,11 +75,14 @@ public abstract class JobBootstrap {
private final SchedulerFacade schedulerFacade;
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
+ @Getter
+ private final JobScheduleController jobScheduleController;
+
+ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
this(regCenter, elasticJob, jobConfig, null, elasticJobListeners);
}
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
+ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
this.regCenter = regCenter;
this.elasticJob = elasticJob;
@@ -89,13 +92,14 @@ public abstract class JobBootstrap {
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJob.getClass().getName(), jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
+ jobScheduleController = createJobScheduleController();
}
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
+ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
this(regCenter, elasticJobType, jobConfig, null, elasticJobListeners);
}
- public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
+ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, TypedJobFactory.createJobInstance(elasticJobType, jobConfig.getProps()), jobConfig, tracingConfig, elasticJobListeners);
}
@@ -109,20 +113,13 @@ public abstract class JobBootstrap {
}
}
- protected final JobScheduleController createJobScheduleController() {
+ private JobScheduleController createJobScheduleController() {
JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
registerStartUpInfo();
return result;
}
- private void registerStartUpInfo() {
- JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
- JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
- JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
- setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
- }
-
private Scheduler createScheduler() {
Scheduler result;
try {
@@ -157,10 +154,17 @@ public abstract class JobBootstrap {
return result;
}
+ private void registerStartUpInfo() {
+ JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
+ JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
+ JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
+ setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
+ }
+
/**
* Shutdown job.
*/
- public final void shutdown() {
+ public void shutdown() {
schedulerFacade.shutdownInstance();
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
index 58a6a0d..c8eeff9 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
@@ -58,7 +58,7 @@ public abstract class BaseIntegrateTest {
@Getter(AccessLevel.PROTECTED)
private final JobConfiguration jobConfiguration;
- private final JobBootstrap bootstrap;
+ private final JobBootstrap jobBootstrap;
private final LeaderService leaderService;
@@ -67,7 +67,7 @@ public abstract class BaseIntegrateTest {
protected BaseIntegrateTest(final TestType type, final ElasticJob elasticJob) {
jobConfiguration = getJobConfiguration(jobName);
- bootstrap = createJobBootstrap(type, elasticJob);
+ jobBootstrap = createJobBootstrap(type, elasticJob);
leaderService = new LeaderService(regCenter, jobName);
}
@@ -94,16 +94,16 @@ public abstract class BaseIntegrateTest {
@Before
public void setUp() {
regCenter.init();
- if (bootstrap instanceof ScheduleJobBootstrap) {
- ((ScheduleJobBootstrap) bootstrap).schedule();
+ if (jobBootstrap instanceof ScheduleJobBootstrap) {
+ ((ScheduleJobBootstrap) jobBootstrap).schedule();
} else {
- ((OneOffJobBootstrap) bootstrap).execute();
+ ((OneOffJobBootstrap) jobBootstrap).execute();
}
}
@After
public void tearDown() {
- bootstrap.shutdown();
+ jobBootstrap.shutdown();
ReflectionUtils.setFieldValue(JobRegistry.getInstance(), "instance", null);
}
@@ -121,7 +121,7 @@ public abstract class BaseIntegrateTest {
assertThat(JobRegistry.getInstance().getJobInstance(jobName).getIp(), is(IpUtils.getIp()));
JobConfiguration jobConfig = YamlEngine.unmarshal(regCenter.get("/" + jobName + "/config"), YamlJobConfiguration.class).toJobConfiguration();
assertThat(jobConfig.getShardingTotalCount(), is(3));
- if (bootstrap instanceof ScheduleJobBootstrap) {
+ if (jobBootstrap instanceof ScheduleJobBootstrap) {
assertThat(jobConfig.getCron(), is("0/1 * * * * ?"));
} else {
assertNull(jobConfig.getCron());
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
index c71982b..92cbd06 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.integrate;
import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.simple.FooSimpleElasticJob;
+import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -37,5 +38,8 @@ public abstract class DisabledJobIntegrateTest extends BaseIntegrateTest {
@Test
public final void assertJobRunning() {
assertRegCenterCommonInfoWithDisabled();
+ while (!FooSimpleElasticJob.isCompleted()) {
+ BlockUtils.waitingShortTime();
+ }
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java
deleted file mode 100644
index 44aee6e..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJobForExecuteFailure;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForExecuteFailureTest extends EnabledJobIntegrateTest {
-
- public StreamingDataflowElasticJobForExecuteFailureTest() {
- super(TestType.ONE_OFF, new StreamingDataflowElasticJobForExecuteFailure());
- }
-
- @Before
- @After
- public void reset() {
- StreamingDataflowElasticJobForExecuteFailure.reset();
- }
-
- @Override
- protected JobConfiguration getJobConfiguration(final String jobName) {
- return JobConfiguration.newBuilder(jobName, 3)
- .shardingItemParameters("0=A,1=B,2=C").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
- }
-
- @Test
- public void assertJobInit() {
- while (!StreamingDataflowElasticJobForExecuteFailure.isCompleted()) {
- BlockUtils.waitingShortTime();
- }
- assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
- }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java
deleted file mode 100644
index a0c0900..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJobForExecuteThrowsException;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForExecuteThrowsExceptionTest extends EnabledJobIntegrateTest {
-
- public StreamingDataflowElasticJobForExecuteThrowsExceptionTest() {
- super(TestType.ONE_OFF, new StreamingDataflowElasticJobForExecuteThrowsException());
- }
-
- @Before
- @After
- public void reset() {
- StreamingDataflowElasticJobForExecuteThrowsException.reset();
- }
-
- @Override
- protected JobConfiguration getJobConfiguration(final String jobName) {
- return JobConfiguration.newBuilder(jobName, 3)
- .shardingItemParameters("0=A,1=B,2=C").jobErrorHandlerType("IGNORE").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
- }
-
- @Test
- public void assertJobInit() {
- while (!StreamingDataflowElasticJobForExecuteThrowsException.isCompleted()) {
- BlockUtils.waitingShortTime();
- }
- assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
- }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java
deleted file mode 100644
index b6e88ef..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForMultipleThreadsTest extends EnabledJobIntegrateTest {
-
- public StreamingDataflowElasticJobForMultipleThreadsTest() {
- super(TestType.ONE_OFF, new StreamingDataflowElasticJob());
- }
-
- @Before
- @After
- public void reset() {
- StreamingDataflowElasticJob.reset();
- }
-
- @Override
- protected JobConfiguration getJobConfiguration(final String jobName) {
- return JobConfiguration.newBuilder(jobName, 3)
- .shardingItemParameters("0=A,1=B,2=C").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
- }
-
- @Test
- public void assertJobInit() {
- while (!StreamingDataflowElasticJob.isCompleted()) {
- BlockUtils.waitingShortTime();
- }
- assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
- }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java
deleted file mode 100644
index 365defa..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForNotMonitorTest extends EnabledJobIntegrateTest {
-
- public StreamingDataflowElasticJobForNotMonitorTest() {
- super(TestType.ONE_OFF, new StreamingDataflowElasticJob());
- }
-
- @Before
- @After
- public void reset() {
- StreamingDataflowElasticJob.reset();
- }
-
- @Override
- protected JobConfiguration getJobConfiguration(final String jobName) {
- return JobConfiguration.newBuilder(jobName, 3)
- .shardingItemParameters("0=A,1=B,2=C").monitorExecution(false).overwrite(true)
- .setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
- }
-
- @Test
- public void assertJobInit() {
- while (!StreamingDataflowElasticJob.isCompleted()) {
- BlockUtils.waitingShortTime();
- }
- assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
- }
-}