You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/28 17:20:11 UTC

[shardingsphere-elasticjob-lite] branch master updated: Redesign ElasticJobExecutor, avoid use JobFacade as input param (#861)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new cf862dc  Redesign ElasticJobExecutor, avoid use JobFacade as input param (#861)
cf862dc is described below

commit cf862dc4fa4ec40aec3cf8add1129f5eb0a9ff98
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Jun 29 01:20:01 2020 +0800

    Redesign ElasticJobExecutor, avoid use JobFacade as input param (#861)
    
    * Refactor LiteJobFacade
    
    * Refactor ElasticJobExecutor
    
    * Refactor ElasticJobExecutor
    
    * Remove ElasticJobExecutor.jobName
    
    * Fix test cases
---
 .../elasticjob/lite/api/JobScheduler.java          | 48 ++++++++++++--------
 .../lite/executor/ElasticJobExecutor.java          | 51 +++++++++++++---------
 .../elasticjob/lite/internal/schedule/LiteJob.java | 21 +++++++--
 .../lite/internal/schedule/LiteJobFacade.java      | 11 ++++-
 .../type/impl/DataflowJobExecutorTest.java         | 10 +++--
 .../executor/type/impl/ScriptJobExecutorTest.java  | 21 +++++----
 .../executor/type/impl/SimpleJobExecutorTest.java  | 11 +++--
 .../executor/type/impl/WrongJobExecutorTest.java   | 10 ++++-
 .../lite/internal/schedule/LiteJobFacadeTest.java  |  7 +--
 9 files changed, 125 insertions(+), 65 deletions(-)

diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java
index 2256ecd..0b44bfd 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java
@@ -22,17 +22,14 @@ import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener
 import org.apache.shardingsphere.elasticjob.lite.api.script.ScriptJob;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
-import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobShutdownHookPlugin;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJob;
-import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJobFacade;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.lite.tracing.JobEventBus;
 import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 import org.quartz.JobBuilder;
 import org.quartz.JobDetail;
@@ -50,9 +47,15 @@ import java.util.Properties;
  */
 public final class JobScheduler {
     
+    private static final String REG_CENTER_DATA_MAP_KEY = "regCenter";
+    
     private static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
     
-    private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
+    private static final String JOB_CONFIG_DATA_MAP_KEY = "jobConfig";
+    
+    private static final String JOB_LISTENERS_DATA_MAP_KEY = "elasticJobListeners";
+    
+    private static final String TRACING_CONFIG_DATA_MAP_KEY = "tracingConfig";
     
     private final CoordinatorRegistryCenter regCenter;
     
@@ -60,29 +63,33 @@ public final class JobScheduler {
     
     private final JobConfiguration jobConfig;
     
-    private final SchedulerFacade schedulerFacade;
+    private final List<ElasticJobListener> elasticJobListeners;
+    
+    private final TracingConfiguration tracingConfig;
     
-    private final JobFacade jobFacade;
+    private final SchedulerFacade schedulerFacade;
     
     public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
-        this(regCenter, elasticJob, jobConfig, new JobEventBus(), elasticJobListeners);
+        this.regCenter = regCenter;
+        this.elasticJob = elasticJob;
+        this.jobConfig = jobConfig;
+        JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
+        this.elasticJobListeners = Arrays.asList(elasticJobListeners);
+        tracingConfig = null;
+        setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
+        schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
     }
     
     public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration tracingConfig,
                         final ElasticJobListener... elasticJobListeners) {
-        this(regCenter, elasticJob, jobConfig, new JobEventBus(tracingConfig), elasticJobListeners);
-    }
-    
-    private JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob,
-                         final JobConfiguration jobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
         this.regCenter = regCenter;
         this.elasticJob = elasticJob;
         this.jobConfig = jobConfig;
         JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
-        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
-        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
-        schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName(), elasticJobListenerList);
-        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
+        this.elasticJobListeners = Arrays.asList(elasticJobListeners);
+        this.tracingConfig = tracingConfig;
+        setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
+        schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
     }
     
     private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
@@ -100,7 +107,7 @@ public final class JobScheduler {
     public void init() {
         JobConfiguration jobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(null == elasticJob ? ScriptJob.class.getName() : elasticJob.getClass().getName(), jobConfig);
         JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfigFromRegCenter.getJobName(), jobConfigFromRegCenter.getShardingTotalCount());
-        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(elasticJob), jobConfigFromRegCenter.getJobName());
+        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(jobConfigFromRegCenter), jobConfigFromRegCenter.getJobName());
         JobRegistry.getInstance().registerJob(jobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
         schedulerFacade.registerStartUpInfo(!jobConfigFromRegCenter.isDisabled());
         jobScheduleController.scheduleJob(jobConfigFromRegCenter.getCron());
@@ -130,9 +137,12 @@ public final class JobScheduler {
         return result;
     }
     
-    private JobDetail createJobDetail(final ElasticJob elasticJob) {
+    private JobDetail createJobDetail(final JobConfiguration jobConfig) {
         JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobConfig.getJobName()).build();
-        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
+        result.getJobDataMap().put(REG_CENTER_DATA_MAP_KEY, regCenter);
+        result.getJobDataMap().put(JOB_CONFIG_DATA_MAP_KEY, jobConfig);
+        result.getJobDataMap().put(JOB_LISTENERS_DATA_MAP_KEY, elasticJobListeners);
+        result.getJobDataMap().put(TRACING_CONFIG_DATA_MAP_KEY, tracingConfig);
         if (null != elasticJob && !elasticJob.getClass().getName().equals(ScriptJob.class.getName())) {
             result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
         }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
index 874e770..f872fb2 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.dataflow.DataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.exception.ExceptionUtils;
@@ -33,12 +34,16 @@ import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.SimpleJobExe
 import org.apache.shardingsphere.elasticjob.lite.handler.error.JobErrorHandler;
 import org.apache.shardingsphere.elasticjob.lite.handler.error.JobErrorHandlerFactory;
 import org.apache.shardingsphere.elasticjob.lite.handler.threadpool.JobExecutorServiceHandlerFactory;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJobFacade;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobExecutionEvent.ExecutionSource;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobStatusTraceEvent.State;
 import org.apache.shardingsphere.elasticjob.lite.util.env.IpUtils;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -52,27 +57,33 @@ public final class ElasticJobExecutor {
     
     private final ElasticJob elasticJob;
     
+    private final JobConfiguration jobConfig;
+    
     private final JobFacade jobFacade;
     
     private final JobItemExecutor jobItemExecutor;
     
-    private final JobConfiguration jobConfig;
-    
-    private final String jobName;
-    
     private final ExecutorService executorService;
     
     private final JobErrorHandler jobErrorHandler;
     
     private final Map<Integer, String> itemErrorMessages;
     
-    public ElasticJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
+    public ElasticJobExecutor(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
+        this(elasticJob, jobConfig, new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners));
+    }
+    
+    public ElasticJobExecutor(final CoordinatorRegistryCenter regCenter, 
+                              final ElasticJob elasticJob, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
+        this(elasticJob, jobConfig, new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig));
+    }
+    
+    private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
         this.elasticJob = elasticJob;
+        this.jobConfig = jobConfig;
         this.jobFacade = jobFacade;
         jobItemExecutor = getJobItemExecutor(elasticJob);
-        jobConfig = jobFacade.loadJobConfiguration(true);
-        jobName = jobConfig.getJobName();
-        executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobName);
+        executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
         jobErrorHandler = JobErrorHandlerFactory.getHandler(jobConfig.getJobErrorHandlerType());
         itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
     }
@@ -98,13 +109,13 @@ public final class ElasticJobExecutor {
         try {
             jobFacade.checkJobExecutionEnvironment();
         } catch (final JobExecutionEnvironmentException cause) {
-            jobErrorHandler.handleException(jobName, cause);
+            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
         ShardingContexts shardingContexts = jobFacade.getShardingContexts();
-        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
+        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
         if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
-                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
+                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                     shardingContexts.getShardingItemParameters().keySet()));
             return;
         }
@@ -113,7 +124,7 @@ public final class ElasticJobExecutor {
             //CHECKSTYLE:OFF
         } catch (final Throwable cause) {
             //CHECKSTYLE:ON
-            jobErrorHandler.handleException(jobName, cause);
+            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
         execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
         while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
@@ -126,13 +137,13 @@ public final class ElasticJobExecutor {
             //CHECKSTYLE:OFF
         } catch (final Throwable cause) {
             //CHECKSTYLE:ON
-            jobErrorHandler.handleException(jobName, cause);
+            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
     }
     
     private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
         if (shardingContexts.getShardingItemParameters().isEmpty()) {
-            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
+            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
             return;
         }
         jobFacade.registerJobBegin(shardingContexts);
@@ -155,13 +166,13 @@ public final class ElasticJobExecutor {
         Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
         if (1 == items.size()) {
             int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
-            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobName, executionSource, item);
+            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
             process(shardingContexts, item, jobExecutionEvent);
             return;
         }
         final CountDownLatch latch = new CountDownLatch(items.size());
-        for (final int each : items) {
-            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobName, executionSource, each);
+        for (int each : items) {
+            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
             if (executorService.isShutdown()) {
                 return;
             }
@@ -183,12 +194,12 @@ public final class ElasticJobExecutor {
     @SuppressWarnings("unchecked")
     private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
         jobFacade.postJobExecutionEvent(startEvent);
-        log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
+        log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
         JobExecutionEvent completeEvent;
         try {
             jobItemExecutor.process(elasticJob, jobConfig, jobFacade, new ShardingContext(shardingContexts, item));
             completeEvent = startEvent.executionSuccess();
-            log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
+            log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
             jobFacade.postJobExecutionEvent(completeEvent);
             // CHECKSTYLE:OFF
         } catch (final Throwable cause) {
@@ -196,7 +207,7 @@ public final class ElasticJobExecutor {
             completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
             jobFacade.postJobExecutionEvent(completeEvent);
             itemErrorMessages.put(item, ExceptionUtils.transform(cause));
-            jobErrorHandler.handleException(jobName, cause);
+            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
     }
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJob.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJob.java
index a643aed..03fab7c 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJob.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJob.java
@@ -17,25 +17,38 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 
+import lombok.Setter;
 import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
+import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
-import lombok.Setter;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 
+import java.util.List;
+
 /**
  * Lite job class.
  */
 @Setter
 public final class LiteJob implements Job {
     
+    private CoordinatorRegistryCenter regCenter;
+    
     private ElasticJob elasticJob;
     
-    private JobFacade jobFacade;
+    private JobConfiguration jobConfig;
+    
+    private List<ElasticJobListener> elasticJobListeners;
+    
+    private TracingConfiguration tracingConfig;
     
     @Override
     public void execute(final JobExecutionContext context) {
-        new ElasticJobExecutor(elasticJob, jobFacade).execute();
+        ElasticJobExecutor executor = null == tracingConfig
+                ? new ElasticJobExecutor(regCenter, elasticJob, jobConfig, elasticJobListeners) : new ElasticJobExecutor(regCenter, elasticJob, jobConfig, elasticJobListeners, tracingConfig);
+        executor.execute();
     }
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
index 1bcf81e..4fe4d23 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionServ
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.tracing.JobEventBus;
+import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobStatusTraceEvent;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobStatusTraceEvent.Source;
@@ -62,7 +63,15 @@ public final class LiteJobFacade implements JobFacade {
     
     private final JobEventBus jobEventBus;
     
-    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
+    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
+        this(regCenter, jobName, elasticJobListeners, new JobEventBus());
+    }
+    
+    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
+        this(regCenter, jobName, elasticJobListeners, new JobEventBus(tracingConfig));
+    }
+    
+    private LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
         configService = new ConfigurationService(regCenter, jobName);
         shardingService = new ShardingService(regCenter, jobName);
         executionContextService = new ExecutionContextService(regCenter, jobName);
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
index 4c22e1e..e3aa7b6 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
@@ -25,6 +25,8 @@ import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
 import org.apache.shardingsphere.elasticjob.lite.fixture.job.JobCaller;
 import org.apache.shardingsphere.elasticjob.lite.fixture.job.TestDataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,6 +46,9 @@ import static org.mockito.Mockito.when;
 public final class DataflowJobExecutorTest {
     
     @Mock
+    private CoordinatorRegistryCenter regCenter;
+    
+    @Mock
     private JobCaller jobCaller;
     
     @Mock
@@ -55,7 +60,6 @@ public final class DataflowJobExecutorTest {
     
     @After
     public void tearDown() {
-        verify(jobFacade).loadJobConfiguration(true);
         ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
     }
     
@@ -172,9 +176,9 @@ public final class DataflowJobExecutorTest {
         this.shardingContexts = shardingContexts;
         JobConfiguration jobConfig = JobConfiguration.newBuilder(ShardingContextsBuilder.JOB_NAME, JobType.DATAFLOW, "0/1 * * * * ?", 3)
                 .jobErrorHandlerType("IGNORE").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.toString(isStreamingProcess)).build();
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(jobConfig);
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
-        elasticJobExecutor = new ElasticJobExecutor(new TestDataflowJob(jobCaller), jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, new TestDataflowJob(jobCaller), jobConfig, Collections.emptyList());
+        ReflectionUtils.setFieldValue(elasticJobExecutor, "jobFacade", jobFacade);
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/ScriptJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/ScriptJobExecutorTest.java
index c4371ad..ec561e2 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/ScriptJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/ScriptJobExecutorTest.java
@@ -24,18 +24,22 @@ import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import java.util.Collections;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class ScriptJobExecutorTest {
     
     @Mock
+    private CoordinatorRegistryCenter regCenter;
+    
+    @Mock
     private JobFacade jobFacade;
     
     private ElasticJobExecutor elasticJobExecutor;
@@ -43,8 +47,8 @@ public final class ScriptJobExecutorTest {
     @Test
     public void assertExecuteWhenCommandLineIsEmpty() {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, ShardingContextsBuilder.getMultipleShardingContexts());
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration("", "IGNORE"));
-        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, null, createJobConfiguration("", "IGNORE"), Collections.emptyList());
+        ReflectionUtils.setFieldValue(elasticJobExecutor, "jobFacade", jobFacade);
         elasticJobExecutor.execute();
     }
     
@@ -60,8 +64,8 @@ public final class ScriptJobExecutorTest {
     
     private void assertExecuteWhenExecuteFailure(final ShardingContexts shardingContexts) {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration("not_exists_file", "THROW"));
-        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, null, createJobConfiguration("not_exists_file", "THROW"), Collections.emptyList());
+        ReflectionUtils.setFieldValue(elasticJobExecutor, "jobFacade", jobFacade);
         elasticJobExecutor.execute();
     }
     
@@ -77,10 +81,9 @@ public final class ScriptJobExecutorTest {
     
     private void assertExecuteSuccess(final ShardingContexts shardingContexts) {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration("exists_file param0 param1", "IGNORE"));
-        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, null, createJobConfiguration("exists_file param0 param1", "IGNORE"), Collections.emptyList());
+        ReflectionUtils.setFieldValue(elasticJobExecutor, "jobFacade", jobFacade);
         elasticJobExecutor.execute();
-        verify(jobFacade).loadJobConfiguration(true);
     }
     
     private JobConfiguration createJobConfiguration(final String scriptCommandLine, final String jobErrorHandlerType) {
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/SimpleJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/SimpleJobExecutorTest.java
index cac00b5..38d0d42 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/SimpleJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/SimpleJobExecutorTest.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.lite.fixture.job.JobCaller;
 import org.apache.shardingsphere.elasticjob.lite.fixture.job.TestSimpleJob;
 import org.apache.shardingsphere.elasticjob.lite.handler.error.impl.LogJobErrorHandler;
 import org.apache.shardingsphere.elasticjob.lite.handler.threadpool.impl.CPUUsageJobExecutorServiceHandler;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobStatusTraceEvent.State;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.Before;
@@ -50,6 +51,9 @@ import static org.mockito.Mockito.when;
 public final class SimpleJobExecutorTest {
     
     @Mock
+    private CoordinatorRegistryCenter regCenter;
+    
+    @Mock
     private JobCaller jobCaller;
     
     @Mock
@@ -59,8 +63,8 @@ public final class SimpleJobExecutorTest {
     
     @Before
     public void setUp() {
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration(null, "THROW"));
-        elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, new TestSimpleJob(jobCaller), createJobConfiguration(null, "THROW"), Collections.emptyList());
+        ReflectionUtils.setFieldValue(elasticJobExecutor, "jobFacade", jobFacade);
     }
     
     private JobConfiguration createJobConfiguration(final String jobExecutorServiceHandlerType, final String jobErrorHandlerType) {
@@ -71,8 +75,7 @@ public final class SimpleJobExecutorTest {
     
     @Test
     public void assertNewExecutorWithDefaultHandlers() {
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration(null, null));
-        elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(regCenter, new TestSimpleJob(jobCaller), createJobConfiguration(null, null), Collections.emptyList());
         assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, "executorService"), instanceOf(new CPUUsageJobExecutorServiceHandler().createExecutorService("test_job").getClass()));
         assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, "jobErrorHandler"), instanceOf(LogJobErrorHandler.class));
     }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/WrongJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/WrongJobExecutorTest.java
index 86f94aa..270de14 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/WrongJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/WrongJobExecutorTest.java
@@ -24,13 +24,16 @@ import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
 import org.apache.shardingsphere.elasticjob.lite.fixture.job.TestWrongJob;
+import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.tracing.event.JobStatusTraceEvent.State;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,14 +44,17 @@ import static org.mockito.Mockito.when;
 public final class WrongJobExecutorTest {
     
     @Mock
+    private CoordinatorRegistryCenter regCenter;
+    
+    @Mock
     private JobFacade jobFacade;
     
     private ElasticJobExecutor wrongJobExecutor;
     
     @Before
     public void setUp() {
-        when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration());
-        wrongJobExecutor = new ElasticJobExecutor(new TestWrongJob(), jobFacade);
+        wrongJobExecutor = new ElasticJobExecutor(regCenter, new TestWrongJob(), createJobConfiguration(), Collections.emptyList());
+        ReflectionUtils.setFieldValue(wrongJobExecutor, "jobFacade", jobFacade);
     }
     
     private JobConfiguration createJobConfiguration() {
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
index 613a9f9..f4f0b45 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
@@ -66,7 +66,7 @@ public final class LiteJobFacadeTest {
     private FailoverService failoverService;
     
     @Mock
-    private JobEventBus eventBus;
+    private JobEventBus jobEventBus;
     
     @Mock
     private ElasticJobListenerCaller caller;
@@ -75,12 +75,13 @@ public final class LiteJobFacadeTest {
     
     @Before
     public void setUp() {
-        liteJobFacade = new LiteJobFacade(null, "test_job", Collections.singletonList(new TestElasticJobListener(caller)), eventBus);
+        liteJobFacade = new LiteJobFacade(null, "test_job", Collections.singletonList(new TestElasticJobListener(caller)));
         ReflectionUtils.setFieldValue(liteJobFacade, "configService", configService);
         ReflectionUtils.setFieldValue(liteJobFacade, "shardingService", shardingService);
         ReflectionUtils.setFieldValue(liteJobFacade, "executionContextService", executionContextService);
         ReflectionUtils.setFieldValue(liteJobFacade, "executionService", executionService);
         ReflectionUtils.setFieldValue(liteJobFacade, "failoverService", failoverService);
+        ReflectionUtils.setFieldValue(liteJobFacade, "jobEventBus", jobEventBus);
     }
     
     @Test
@@ -238,6 +239,6 @@ public final class LiteJobFacadeTest {
     @Test
     public void assertPostJobExecutionEvent() {
         liteJobFacade.postJobExecutionEvent(null);
-        verify(eventBus).post(null);
+        verify(jobEventBus).post(null);
     }
 }