You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/03 11:09:00 UTC

[shardingsphere-elasticjob-lite] branch master updated: Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new f64f373  Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)
f64f373 is described below

commit f64f373b6efbb793cc11a16a568765b3c29e1d91
Author: Liang Zhang <te...@163.com>
AuthorDate: Fri Jul 3 19:08:53 2020 +0800

    Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap (#925)
    
    * Decouple JobBootstrap with OneOffJobBootstrap and ScheduleJobBootstrap
    
    * Move JobScheduler to scheduler package
    
    * Fix test cases
    
    * Fix test cases
---
 .../lite/api/bootstrap/JobBootstrap.java           | 149 +--------------------
 .../lite/api/bootstrap/OneOffJobBootstrap.java     |  20 ++-
 .../lite/api/bootstrap/ScheduleJobBootstrap.java   |  22 ++-
 .../JobScheduler.java}                             |  36 ++---
 .../lite/integrate/BaseIntegrateTest.java          |  14 +-
 .../lite/integrate/DisabledJobIntegrateTest.java   |   4 +
 ...ingDataflowElasticJobForExecuteFailureTest.java |  56 --------
 ...lowElasticJobForExecuteThrowsExceptionTest.java |  56 --------
 ...ngDataflowElasticJobForMultipleThreadsTest.java |  56 --------
 ...reamingDataflowElasticJobForNotMonitorTest.java |  57 --------
 10 files changed, 66 insertions(+), 404 deletions(-)

diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
index 70624b9..db32fb5 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
@@ -17,150 +17,13 @@
 
 package org.apache.shardingsphere.elasticjob.lite.api.bootstrap;
 
-import lombok.Getter;
-import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
-import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
-import org.apache.shardingsphere.elasticjob.lite.handler.sharding.JobInstance;
-import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobShutdownHookPlugin;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJob;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade;
-import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
-import org.apache.shardingsphere.elasticjob.lite.job.TypedJobFactory;
-import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.simpl.SimpleThreadPool;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
 /**
- * Schedule job bootstrap.
+ * Job bootstrap.
  */
-public abstract class JobBootstrap {
-    
-    private static final String REG_CENTER_DATA_MAP_KEY = "regCenter";
-    
-    private static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
-    
-    private static final String JOB_CONFIG_DATA_MAP_KEY = "jobConfig";
-    
-    private static final String JOB_LISTENERS_DATA_MAP_KEY = "elasticJobListeners";
-    
-    private static final String TRACING_CONFIG_DATA_MAP_KEY = "tracingConfig";
-    
-    private final CoordinatorRegistryCenter regCenter;
-    
-    private final ElasticJob elasticJob;
-    
-    @Getter
-    private final JobConfiguration jobConfig;
-    
-    private final List<ElasticJobListener> elasticJobListeners;
-    
-    private final TracingConfiguration tracingConfig;
-    
-    private final SetUpFacade setUpFacade;
-    
-    private final SchedulerFacade schedulerFacade;
-    
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        this(regCenter, elasticJob, jobConfig, null, elasticJobListeners);
-    }
-    
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
-                        final ElasticJobListener... elasticJobListeners) {
-        this.regCenter = regCenter;
-        this.elasticJob = elasticJob;
-        this.elasticJobListeners = Arrays.asList(elasticJobListeners);
-        this.tracingConfig = tracingConfig;
-        setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
-        schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
-        this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJob.getClass().getName(), jobConfig);
-        setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
-    }
-    
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        this(regCenter, elasticJobType, jobConfig, null, elasticJobListeners);
-    }
-    
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
-                        final ElasticJobListener... elasticJobListeners) {
-        this(regCenter, TypedJobFactory.createJobInstance(elasticJobType, jobConfig.getProps()), jobConfig, tracingConfig, elasticJobListeners);
-    }
-    
-    private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
-        GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName());
-        for (ElasticJobListener each : elasticJobListeners) {
-            if (each instanceof AbstractDistributeOnceElasticJobListener) {
-                ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
-            }
-        }
-    }
-    
-    protected final JobScheduleController createJobScheduleController() {
-        JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
-        JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
-        registerStartUpInfo();
-        return result;
-    }
-    
-    private void registerStartUpInfo() {
-        JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
-        JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
-        JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
-        setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
-    }
-    
-    private Scheduler createScheduler() {
-        Scheduler result;
-        try {
-            StdSchedulerFactory factory = new StdSchedulerFactory();
-            factory.initialize(getQuartzProps());
-            result = factory.getScheduler();
-            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
-        } catch (final SchedulerException ex) {
-            throw new JobSystemException(ex);
-        }
-        return result;
-    }
-    
-    private Properties getQuartzProps() {
-        Properties result = new Properties();
-        result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
-        result.put("org.quartz.threadPool.threadCount", "1");
-        result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
-        result.put("org.quartz.jobStore.misfireThreshold", "1");
-        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
-        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
-        return result;
-    }
-    
-    private JobDetail createJobDetail() {
-        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
-        result.getJobDataMap().put(REG_CENTER_DATA_MAP_KEY, regCenter);
-        result.getJobDataMap().put(JOB_CONFIG_DATA_MAP_KEY, getJobConfig());
-        result.getJobDataMap().put(JOB_LISTENERS_DATA_MAP_KEY, elasticJobListeners);
-        result.getJobDataMap().put(TRACING_CONFIG_DATA_MAP_KEY, tracingConfig);
-        result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
-        return result;
-    }
+public interface JobBootstrap {
     
-   /**
-    * Shutdown job.
-    */
-    public final void shutdown() { 
-        schedulerFacade.shutdownInstance();
-    }
+    /**
+     * Shutdown job.
+     */
+    void shutdown();
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
index 053212a..1c1e840 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/OneOffJobBootstrap.java
@@ -21,35 +21,43 @@ import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.scheduler.JobScheduler;
 import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 
 /**
  * One off job bootstrap.
  */
-public final class OneOffJobBootstrap extends JobBootstrap {
+public final class OneOffJobBootstrap implements JobBootstrap {
+    
+    private final JobScheduler jobScheduler;
     
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJob, jobConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, elasticJobListeners);
     }
     
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                               final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
     }
     
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJobType, jobConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, elasticJobListeners);
     }
     
     public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                               final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
     }
     
     /**
      * Execute job.
      */
     public void execute() {
-        createJobScheduleController().executeJob();
+        jobScheduler.getJobScheduleController().executeJob();
+    }
+    
+    @Override
+    public void shutdown() {
+        jobScheduler.shutdown();
     }
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
index 5cf47d3..1f3e2c4 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/ScheduleJobBootstrap.java
@@ -23,36 +23,44 @@ import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.scheduler.JobScheduler;
 import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 
 /**
  * Schedule job bootstrap.
  */
-public final class ScheduleJobBootstrap extends JobBootstrap {
+public final class ScheduleJobBootstrap implements JobBootstrap {
+    
+    private final JobScheduler jobScheduler;
     
     public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJob, jobConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, elasticJobListeners);
     }
     
     public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                                 final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, tracingConfig, elasticJobListeners);
     }
     
     public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJobType, jobConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, elasticJobListeners);
     }
     
     public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                                 final ElasticJobListener... elasticJobListeners) {
-        super(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
+        jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig, tracingConfig, elasticJobListeners);
     }
     
     /**
      * Schedule job.
      */
     public void schedule() {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(getJobConfig().getCron()), "Cron can not be empty.");
-        createJobScheduleController().scheduleJob(getJobConfig().getCron());
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron can not be empty.");
+        jobScheduler.getJobScheduleController().scheduleJob(jobScheduler.getJobConfig().getCron());
+    }
+    
+    @Override
+    public void shutdown() {
+        jobScheduler.shutdown();
     }
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
similarity index 92%
copy from elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
copy to elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
index 70624b9..188cb24 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/JobBootstrap.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/scheduler/JobScheduler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.api.bootstrap;
+package org.apache.shardingsphere.elasticjob.lite.scheduler;
 
 import lombok.Getter;
 import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
@@ -46,9 +46,9 @@ import java.util.List;
 import java.util.Properties;
 
 /**
- * Schedule job bootstrap.
+ * Job scheduler.
  */
-public abstract class JobBootstrap {
+public final class JobScheduler {
     
     private static final String REG_CENTER_DATA_MAP_KEY = "regCenter";
     
@@ -75,11 +75,14 @@ public abstract class JobBootstrap {
     
     private final SchedulerFacade schedulerFacade;
     
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
+    @Getter
+    private final JobScheduleController jobScheduleController;
+    
+    public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
         this(regCenter, elasticJob, jobConfig, null, elasticJobListeners);
     }
     
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
+    public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                         final ElasticJobListener... elasticJobListeners) {
         this.regCenter = regCenter;
         this.elasticJob = elasticJob;
@@ -89,13 +92,14 @@ public abstract class JobBootstrap {
         schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
         this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJob.getClass().getName(), jobConfig);
         setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
+        jobScheduleController = createJobScheduleController();
     }
     
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
+    public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
         this(regCenter, elasticJobType, jobConfig, null, elasticJobListeners);
     }
     
-    public JobBootstrap(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
+    public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elasticJobType, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                         final ElasticJobListener... elasticJobListeners) {
         this(regCenter, TypedJobFactory.createJobInstance(elasticJobType, jobConfig.getProps()), jobConfig, tracingConfig, elasticJobListeners);
     }
@@ -109,20 +113,13 @@ public abstract class JobBootstrap {
         }
     }
     
-    protected final JobScheduleController createJobScheduleController() {
+    private JobScheduleController createJobScheduleController() {
         JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
         JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
         registerStartUpInfo();
         return result;
     }
     
-    private void registerStartUpInfo() {
-        JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
-        JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
-        JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
-        setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
-    }
-    
     private Scheduler createScheduler() {
         Scheduler result;
         try {
@@ -157,10 +154,17 @@ public abstract class JobBootstrap {
         return result;
     }
     
+    private void registerStartUpInfo() {
+        JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
+        JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
+        JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
+        setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
+    }
+    
    /**
     * Shutdown job.
     */
-    public final void shutdown() { 
+    public void shutdown() {
         schedulerFacade.shutdownInstance();
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
index 58a6a0d..c8eeff9 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/BaseIntegrateTest.java
@@ -58,7 +58,7 @@ public abstract class BaseIntegrateTest {
     @Getter(AccessLevel.PROTECTED)
     private final JobConfiguration jobConfiguration;
     
-    private final JobBootstrap bootstrap;
+    private final JobBootstrap jobBootstrap;
     
     private final LeaderService leaderService;
     
@@ -67,7 +67,7 @@ public abstract class BaseIntegrateTest {
     
     protected BaseIntegrateTest(final TestType type, final ElasticJob elasticJob) {
         jobConfiguration = getJobConfiguration(jobName);
-        bootstrap = createJobBootstrap(type, elasticJob);
+        jobBootstrap = createJobBootstrap(type, elasticJob);
         leaderService = new LeaderService(regCenter, jobName);
     }
     
@@ -94,16 +94,16 @@ public abstract class BaseIntegrateTest {
     @Before
     public void setUp() {
         regCenter.init();
-        if (bootstrap instanceof ScheduleJobBootstrap) {
-            ((ScheduleJobBootstrap) bootstrap).schedule();
+        if (jobBootstrap instanceof ScheduleJobBootstrap) {
+            ((ScheduleJobBootstrap) jobBootstrap).schedule();
         } else {
-            ((OneOffJobBootstrap) bootstrap).execute();
+            ((OneOffJobBootstrap) jobBootstrap).execute();
         }
     }
     
     @After
     public void tearDown() {
-        bootstrap.shutdown();
+        jobBootstrap.shutdown();
         ReflectionUtils.setFieldValue(JobRegistry.getInstance(), "instance", null);
     }
     
@@ -121,7 +121,7 @@ public abstract class BaseIntegrateTest {
         assertThat(JobRegistry.getInstance().getJobInstance(jobName).getIp(), is(IpUtils.getIp()));
         JobConfiguration jobConfig = YamlEngine.unmarshal(regCenter.get("/" + jobName + "/config"), YamlJobConfiguration.class).toJobConfiguration();
         assertThat(jobConfig.getShardingTotalCount(), is(3));
-        if (bootstrap instanceof ScheduleJobBootstrap) {
+        if (jobBootstrap instanceof ScheduleJobBootstrap) {
             assertThat(jobConfig.getCron(), is("0/1 * * * * ?"));
         } else {
             assertNull(jobConfig.getCron());
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
index c71982b..92cbd06 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/DisabledJobIntegrateTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.integrate;
 
 import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.simple.FooSimpleElasticJob;
+import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,5 +38,8 @@ public abstract class DisabledJobIntegrateTest extends BaseIntegrateTest {
     @Test
     public final void assertJobRunning() {
         assertRegCenterCommonInfoWithDisabled();
+        while (!FooSimpleElasticJob.isCompleted()) {
+            BlockUtils.waitingShortTime();
+        }
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java
deleted file mode 100644
index 44aee6e..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteFailureTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJobForExecuteFailure;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForExecuteFailureTest extends EnabledJobIntegrateTest {
-    
-    public StreamingDataflowElasticJobForExecuteFailureTest() {
-        super(TestType.ONE_OFF, new StreamingDataflowElasticJobForExecuteFailure());
-    }
-    
-    @Before
-    @After
-    public void reset() {
-        StreamingDataflowElasticJobForExecuteFailure.reset();
-    }
-    
-    @Override
-    protected JobConfiguration getJobConfiguration(final String jobName) {
-        return JobConfiguration.newBuilder(jobName, 3)
-                .shardingItemParameters("0=A,1=B,2=C").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
-    }
-    
-    @Test
-    public void assertJobInit() {
-        while (!StreamingDataflowElasticJobForExecuteFailure.isCompleted()) {
-            BlockUtils.waitingShortTime();
-        }
-        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
-    }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java
deleted file mode 100644
index a0c0900..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForExecuteThrowsExceptionTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJobForExecuteThrowsException;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForExecuteThrowsExceptionTest extends EnabledJobIntegrateTest {
-    
-    public StreamingDataflowElasticJobForExecuteThrowsExceptionTest() {
-        super(TestType.ONE_OFF, new StreamingDataflowElasticJobForExecuteThrowsException());
-    }
-    
-    @Before
-    @After
-    public void reset() {
-        StreamingDataflowElasticJobForExecuteThrowsException.reset();
-    }
-    
-    @Override
-    protected JobConfiguration getJobConfiguration(final String jobName) {
-        return JobConfiguration.newBuilder(jobName, 3)
-                .shardingItemParameters("0=A,1=B,2=C").jobErrorHandlerType("IGNORE").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
-    }
-    
-    @Test
-    public void assertJobInit() {
-        while (!StreamingDataflowElasticJobForExecuteThrowsException.isCompleted()) {
-            BlockUtils.waitingShortTime();
-        }
-        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
-    }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java
deleted file mode 100644
index b6e88ef..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForMultipleThreadsTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForMultipleThreadsTest extends EnabledJobIntegrateTest {
-    
-    public StreamingDataflowElasticJobForMultipleThreadsTest() {
-        super(TestType.ONE_OFF, new StreamingDataflowElasticJob());
-    }
-    
-    @Before
-    @After
-    public void reset() {
-        StreamingDataflowElasticJob.reset();
-    }
-    
-    @Override
-    protected JobConfiguration getJobConfiguration(final String jobName) {
-        return JobConfiguration.newBuilder(jobName, 3)
-                .shardingItemParameters("0=A,1=B,2=C").overwrite(true).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
-    }
-    
-    @Test
-    public void assertJobInit() {
-        while (!StreamingDataflowElasticJob.isCompleted()) {
-            BlockUtils.waitingShortTime();
-        }
-        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
-    }
-}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java
deleted file mode 100644
index 365defa..0000000
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/assertion/enable/oneoff/dataflow/StreamingDataflowElasticJobForNotMonitorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.lite.integrate.assertion.enable.oneoff.dataflow;
-
-import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.integrate.EnabledJobIntegrateTest;
-import org.apache.shardingsphere.elasticjob.lite.integrate.fixture.dataflow.StreamingDataflowElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public final class StreamingDataflowElasticJobForNotMonitorTest extends EnabledJobIntegrateTest {
-    
-    public StreamingDataflowElasticJobForNotMonitorTest() {
-        super(TestType.ONE_OFF, new StreamingDataflowElasticJob());
-    }
-    
-    @Before
-    @After
-    public void reset() {
-        StreamingDataflowElasticJob.reset();
-    }
-    
-    @Override
-    protected JobConfiguration getJobConfiguration(final String jobName) {
-        return JobConfiguration.newBuilder(jobName, 3)
-                .shardingItemParameters("0=A,1=B,2=C").monitorExecution(false).overwrite(true)
-                .setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build();
-    }
-    
-    @Test
-    public void assertJobInit() {
-        while (!StreamingDataflowElasticJob.isCompleted()) {
-            BlockUtils.waitingShortTime();
-        }
-        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
-    }
-}