You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/20 06:50:06 UTC

[shardingsphere-elasticjob-lite] branch master updated: Decouple JobItemExecutor and JobExecutor (#796)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new da3fda5  Decouple JobItemExecutor and JobExecutor (#796)
da3fda5 is described below

commit da3fda56a0e51b123e9a0a915f9c780a2626b7e0
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Jun 20 14:49:59 2020 +0800

    Decouple JobItemExecutor and JobExecutor (#796)
    
    * Avoid inner class import
    
    * Use JobItemExecutor to decouple job executor
    
    * refactor JobItemExecutor
---
 .../lite/executor/ElasticJobExecutor.java          | 23 ++++++-----
 .../lite/executor/JobExecutorFactory.java          | 12 ++++--
 .../lite/executor/type/DataflowJobExecutor.java    | 45 ++++++++++------------
 ...SimpleJobExecutor.java => JobItemExecutor.java} | 30 +++++++--------
 .../lite/executor/type/ScriptJobExecutor.java      | 13 +++----
 .../lite/executor/type/SimpleJobExecutor.java      | 15 ++------
 .../lite/executor/JobExecutorFactoryTest.java      | 17 ++++++--
 .../executor/type/DataflowJobExecutorTest.java     | 21 +++++-----
 .../lite/executor/type/ScriptJobExecutorTest.java  | 15 ++++----
 .../lite/executor/type/SimpleJobExecutorTest.java  | 30 +++++++--------
 .../lite/executor/type/WrongJobExecutorTest.java   |  9 +++--
 11 files changed, 119 insertions(+), 111 deletions(-)

diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
index a2f23b5..4464aa2 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
@@ -17,9 +17,8 @@
 
 package org.apache.shardingsphere.elasticjob.lite.executor;
 
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.event.type.JobExecutionEvent;
@@ -32,6 +31,7 @@ import org.apache.shardingsphere.elasticjob.lite.executor.handler.ExecutorServic
 import org.apache.shardingsphere.elasticjob.lite.executor.handler.ExecutorServiceHandlerRegistry;
 import org.apache.shardingsphere.elasticjob.lite.executor.handler.JobExceptionHandler;
 import org.apache.shardingsphere.elasticjob.lite.executor.handler.JobProperties.JobPropertiesEnum;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
 
 import java.util.Collection;
 import java.util.Map;
@@ -43,12 +43,12 @@ import java.util.concurrent.ExecutorService;
  * ElasticJob executor.
  */
 @Slf4j
-public abstract class ElasticJobExecutor {
+public final class ElasticJobExecutor {
+    
+    private final ElasticJob elasticJob;
     
-    @Getter(AccessLevel.PROTECTED)
     private final JobFacade jobFacade;
     
-    @Getter(AccessLevel.PROTECTED)
     private final JobRootConfiguration jobRootConfig;
     
     private final String jobName;
@@ -59,13 +59,17 @@ public abstract class ElasticJobExecutor {
     
     private final Map<Integer, String> itemErrorMessages;
     
-    protected ElasticJobExecutor(final JobFacade jobFacade) {
+    private final JobItemExecutor jobItemExecutor;
+    
+    public ElasticJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
+        this.elasticJob = elasticJob;
         this.jobFacade = jobFacade;
         jobRootConfig = jobFacade.loadJobRootConfiguration(true);
         jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
         executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
         jobExceptionHandler = (JobExceptionHandler) getHandler(JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
         itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
+        this.jobItemExecutor = jobItemExecutor;
     }
     
     private Object getHandler(final JobPropertiesEnum jobPropertiesEnum) {
@@ -93,7 +97,7 @@ public abstract class ElasticJobExecutor {
     /**
      * Execute job.
      */
-    public final void execute() {
+    public void execute() {
         try {
             jobFacade.checkJobExecutionEnvironment();
         } catch (final JobExecutionEnvironmentException cause) {
@@ -191,6 +195,7 @@ public abstract class ElasticJobExecutor {
         }
     }
     
+    @SuppressWarnings("unchecked")
     private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
         if (shardingContexts.isAllowSendJobEvent()) {
             jobFacade.postJobExecutionEvent(startEvent);
@@ -198,7 +203,7 @@ public abstract class ElasticJobExecutor {
         log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
         JobExecutionEvent completeEvent;
         try {
-            process(new ShardingContext(shardingContexts, item));
+            jobItemExecutor.process(elasticJob, jobRootConfig, jobFacade, new ShardingContext(shardingContexts, item));
             completeEvent = startEvent.executionSuccess();
             log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
             if (shardingContexts.isAllowSendJobEvent()) {
@@ -213,6 +218,4 @@ public abstract class ElasticJobExecutor {
             jobExceptionHandler.handleException(jobName, cause);
         }
     }
-    
-    protected abstract void process(ShardingContext shardingContext);
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
index 9f9a16e..06d7ae1 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.lite.api.dataflow.DataflowJob;
 import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.DataflowJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.ScriptJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.SimpleJobExecutor;
 
@@ -42,14 +43,19 @@ public final class JobExecutorFactory {
      */
     @SuppressWarnings("unchecked")
     public static ElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
+        return new ElasticJobExecutor(elasticJob, jobFacade, getJobItemExecutor(elasticJob));
+    }
+    
+    @SuppressWarnings("unchecked")
+    private static JobItemExecutor getJobItemExecutor(final ElasticJob elasticJob) {
         if (null == elasticJob) {
-            return new ScriptJobExecutor(jobFacade);
+            return new ScriptJobExecutor();
         }
         if (elasticJob instanceof SimpleJob) {
-            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
+            return new SimpleJobExecutor();
         }
         if (elasticJob instanceof DataflowJob) {
-            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
+            return new DataflowJobExecutor();
         }
         throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
     }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
index 7fa27d1..05a7258 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.dataflow.DataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.config.dataflow.DataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 
 import java.util.List;
@@ -28,48 +28,43 @@ import java.util.List;
 /**
  * Dataflow job executor.
  */
-public final class DataflowJobExecutor extends ElasticJobExecutor {
-    
-    private final DataflowJob<Object> dataflowJob;
-    
-    public DataflowJobExecutor(final DataflowJob<Object> dataflowJob, final JobFacade jobFacade) {
-        super(jobFacade);
-        this.dataflowJob = dataflowJob;
-    }
+public final class DataflowJobExecutor implements JobItemExecutor<DataflowJob> {
     
     @Override
-    protected void process(final ShardingContext shardingContext) {
-        DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig();
+    public void process(final DataflowJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+        DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) jobRootConfig.getTypeConfig();
         if (dataflowConfig.isStreamingProcess()) {
-            streamingExecute(shardingContext);
+            streamingExecute(elasticJob, jobFacade, shardingContext);
         } else {
-            oneOffExecute(shardingContext);
+            oneOffExecute(elasticJob, shardingContext);
         }
     }
     
-    private void streamingExecute(final ShardingContext shardingContext) {
-        List<Object> data = fetchData(shardingContext);
+    private void streamingExecute(final DataflowJob elasticJob, final JobFacade jobFacade, final ShardingContext shardingContext) {
+        List<Object> data = fetchData(elasticJob, shardingContext);
         while (null != data && !data.isEmpty()) {
-            processData(shardingContext, data);
-            if (!getJobFacade().isEligibleForJobRunning()) {
+            processData(elasticJob, shardingContext, data);
+            if (!jobFacade.isEligibleForJobRunning()) {
                 break;
             }
-            data = fetchData(shardingContext);
+            data = fetchData(elasticJob, shardingContext);
         }
     }
     
-    private void oneOffExecute(final ShardingContext shardingContext) {
-        List<Object> data = fetchData(shardingContext);
+    private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
+        List<Object> data = fetchData(elasticJob, shardingContext);
         if (null != data && !data.isEmpty()) {
-            processData(shardingContext, data);
+            processData(elasticJob, shardingContext, data);
         }
     }
     
-    private List<Object> fetchData(final ShardingContext shardingContext) {
-        return dataflowJob.fetchData(shardingContext);
+    @SuppressWarnings("unchecked")
+    private List<Object> fetchData(final DataflowJob elasticJob, final ShardingContext shardingContext) {
+        return elasticJob.fetchData(shardingContext);
     }
     
-    private void processData(final ShardingContext shardingContext, final List<Object> data) {
-        dataflowJob.processData(shardingContext, data);
+    @SuppressWarnings("unchecked")
+    private void processData(final DataflowJob elasticJob, final ShardingContext shardingContext, final List<Object> data) {
+        elasticJob.processData(shardingContext, data);
     }
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
similarity index 63%
copy from elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
copy to elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
index dc4f815..e746d9d 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
@@ -17,25 +17,25 @@
 
 package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
+import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 
 /**
- * Simple job executor.
+ * Job item executor.
+ * 
+ * @param <T> type of ElasticJob
  */
-public final class SimpleJobExecutor extends ElasticJobExecutor {
+public interface JobItemExecutor<T extends ElasticJob> {
     
-    private final SimpleJob simpleJob;
-    
-    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
-        super(jobFacade);
-        this.simpleJob = simpleJob;
-    }
-    
-    @Override
-    protected void process(final ShardingContext shardingContext) {
-        simpleJob.execute(shardingContext);
-    }
+    /**
+     * Process job item.
+     * 
+     * @param elasticJob elastic job
+     * @param jobRootConfig job root configuration
+     * @param jobFacade job facade
+     * @param shardingContext sharding context
+     */
+    void process(T elasticJob, JobRootConfiguration jobRootConfig, JobFacade jobFacade, ShardingContext shardingContext);
 }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
index 0cb5f01..6e7cbfd 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
@@ -21,9 +21,10 @@ import com.google.common.base.Strings;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.lite.api.script.ScriptJob;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.config.script.ScriptJobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.util.json.GsonFactory;
 
@@ -32,15 +33,11 @@ import java.io.IOException;
 /**
  * Script job executor.
  */
-public final class ScriptJobExecutor extends ElasticJobExecutor {
-    
-    public ScriptJobExecutor(final JobFacade jobFacade) {
-        super(jobFacade);
-    }
+public final class ScriptJobExecutor implements JobItemExecutor<ScriptJob> {
     
     @Override
-    protected void process(final ShardingContext shardingContext) {
-        final String scriptCommandLine = ((ScriptJobConfiguration) getJobRootConfig().getTypeConfig()).getScriptCommandLine();
+    public void process(final ScriptJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+        String scriptCommandLine = ((ScriptJobConfiguration) jobRootConfig.getTypeConfig()).getScriptCommandLine();
         if (Strings.isNullOrEmpty(scriptCommandLine)) {
             throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", shardingContext.getJobName());
         }
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
index dc4f815..f7321b5 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
@@ -19,23 +19,16 @@ package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
 import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 
 /**
  * Simple job executor.
  */
-public final class SimpleJobExecutor extends ElasticJobExecutor {
-    
-    private final SimpleJob simpleJob;
-    
-    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
-        super(jobFacade);
-        this.simpleJob = simpleJob;
-    }
+public final class SimpleJobExecutor implements JobItemExecutor<SimpleJob> {
     
     @Override
-    protected void process(final ShardingContext shardingContext) {
-        simpleJob.execute(shardingContext);
+    public void process(final SimpleJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+        elasticJob.execute(shardingContext);
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
index ca4c311..51f27d2 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.shardingsphere.elasticjob.lite.executor;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.DataflowJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.ScriptJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.type.SimpleJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.fixture.config.TestDataflowJobConfiguration;
@@ -33,6 +35,8 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -47,19 +51,19 @@ public final class JobExecutorFactoryTest {
     @Test
     public void assertGetJobExecutorForScriptJob() {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh", IgnoreJobExceptionHandler.class));
-        assertThat(JobExecutorFactory.getJobExecutor(null, jobFacade), instanceOf(ScriptJobExecutor.class));
+        assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(null, jobFacade)), instanceOf(ScriptJobExecutor.class));
     }
     
     @Test
     public void assertGetJobExecutorForSimpleJob() {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
-        assertThat(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade), instanceOf(SimpleJobExecutor.class));
+        assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade)), instanceOf(SimpleJobExecutor.class));
     }
     
     @Test
     public void assertGetJobExecutorForDataflowJob() {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false));
-        assertThat(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade), instanceOf(DataflowJobExecutor.class));
+        assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade)), instanceOf(DataflowJobExecutor.class));
     }
     
     @Test(expected = JobConfigurationException.class)
@@ -74,4 +78,11 @@ public final class JobExecutorFactoryTest {
         ElasticJobExecutor anotherExecutor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
         assertTrue(executor.hashCode() != anotherExecutor.hashCode());
     }
+    
+    @SneakyThrows
+    private JobItemExecutor getJobItemExecutor(final ElasticJobExecutor elasticJobExecutor) {
+        Field field = ElasticJobExecutor.class.getDeclaredField("jobItemExecutor");
+        field.setAccessible(true);
+        return (JobItemExecutor) field.get(elasticJobExecutor);
+    }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
index 46a0b55..a5a8af2 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
@@ -49,7 +50,7 @@ public final class DataflowJobExecutorTest {
     
     private ShardingContexts shardingContexts;
     
-    private DataflowJobExecutor dataflowJobExecutor;
+    private ElasticJobExecutor elasticJobExecutor;
     
     @After
     public void tearDown() {
@@ -62,7 +63,7 @@ public final class DataflowJobExecutorTest {
         setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
         when(jobCaller.fetchData(0)).thenReturn(null);
         when(jobCaller.fetchData(1)).thenReturn(Collections.emptyList());
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller).fetchData(0);
         verify(jobCaller).fetchData(1);
         verify(jobCaller, times(0)).processData(any());
@@ -72,7 +73,7 @@ public final class DataflowJobExecutorTest {
     public void assertExecuteWhenFetchDataIsNotEmptyForUnStreamingProcessAndSingleShardingItem() {
         setUp(false, ShardingContextsBuilder.getSingleShardingContexts());
         doThrow(new IllegalStateException()).when(jobCaller).fetchData(0);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller).fetchData(0);
         verify(jobCaller, times(0)).processData(any());
     }
@@ -83,7 +84,7 @@ public final class DataflowJobExecutorTest {
         when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
         doThrow(new IllegalStateException()).when(jobCaller).processData(4);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller).fetchData(0);
         verify(jobCaller).fetchData(1);
         verify(jobCaller).processData(1);
@@ -98,7 +99,7 @@ public final class DataflowJobExecutorTest {
         setUp(true, ShardingContextsBuilder.getSingleShardingContexts());
         when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
         when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller).processData(1);
     }
@@ -110,7 +111,7 @@ public final class DataflowJobExecutorTest {
         when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
         when(jobCaller.fetchData(1)).thenReturn(Collections.singletonList(2), Collections.emptyList());
         when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller, times(2)).fetchData(1);
         verify(jobCaller).processData(1);
@@ -125,7 +126,7 @@ public final class DataflowJobExecutorTest {
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(2, 3), Collections.emptyList());
         when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
         doThrow(new IllegalStateException()).when(jobCaller).processData(2);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller, times(1)).fetchData(1);
         verify(jobCaller).processData(1);
@@ -141,7 +142,7 @@ public final class DataflowJobExecutorTest {
         when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2), Collections.emptyList());
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4), Collections.emptyList());
         doThrow(new IllegalStateException()).when(jobCaller).processData(4);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller, times(1)).fetchData(1);
         verify(jobCaller).processData(1);
@@ -157,7 +158,7 @@ public final class DataflowJobExecutorTest {
         when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
         doThrow(new IllegalStateException()).when(jobCaller).processData(4);
-        dataflowJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobCaller).fetchData(0);
         verify(jobCaller).fetchData(1);
         verify(jobCaller).processData(1);
@@ -170,7 +171,7 @@ public final class DataflowJobExecutorTest {
         this.shardingContexts = shardingContexts;
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(isStreamingProcess));
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
-        dataflowJobExecutor = new DataflowJobExecutor(new TestDataflowJob(jobCaller), jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(new TestDataflowJob(jobCaller), jobFacade, new DataflowJobExecutor());
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
index 7704856..2ef8285 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
 import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
@@ -38,14 +39,14 @@ public final class ScriptJobExecutorTest {
     @Mock
     private JobFacade jobFacade;
     
-    private ScriptJobExecutor scriptJobExecutor;
+    private ElasticJobExecutor elasticJobExecutor;
     
     @Test
     public void assertExecuteWhenCommandLineIsEmpty() {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, ShardingContextsBuilder.getMultipleShardingContexts());
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("", IgnoreJobExceptionHandler.class));
-        scriptJobExecutor = new ScriptJobExecutor(jobFacade);
-        scriptJobExecutor.execute();
+        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+        elasticJobExecutor.execute();
     }
     
     @Test(expected = JobSystemException.class)
@@ -61,8 +62,8 @@ public final class ScriptJobExecutorTest {
     private void assertExecuteWhenExecuteFailure(final ShardingContexts shardingContexts) {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("not_exists_file", ThrowJobExceptionHandler.class));
-        scriptJobExecutor = new ScriptJobExecutor(jobFacade);
-        scriptJobExecutor.execute();
+        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+        elasticJobExecutor.execute();
     }
     
     @Test
@@ -78,8 +79,8 @@ public final class ScriptJobExecutorTest {
     private void assertExecuteSuccess(final ShardingContexts shardingContexts) {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("exists_file param0 param1", IgnoreJobExceptionHandler.class));
-        scriptJobExecutor = new ScriptJobExecutor(jobFacade);
-        scriptJobExecutor.execute();
+        elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+        elasticJobExecutor.execute();
         verify(jobFacade).loadJobRootConfiguration(true);
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
index be0816a..1f02ca5 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
@@ -54,21 +54,21 @@ public final class SimpleJobExecutorTest {
     @Mock
     private JobFacade jobFacade;
     
-    private SimpleJobExecutor simpleJobExecutor;
+    private ElasticJobExecutor elasticJobExecutor;
     
     @Before
     public void setUp() {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
-        simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
+        elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade, new SimpleJobExecutor());
     }
     
     @Test
     public void assertNewExecutorWithDefaultHandlers() throws NoSuchFieldException {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration("ErrorHandler", Object.class.getName()));
-        SimpleJobExecutor simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
-        assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, ElasticJobExecutor.class.getDeclaredField("executorService")), 
+        elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade, new SimpleJobExecutor());
+        assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, ElasticJobExecutor.class.getDeclaredField("executorService")), 
                 instanceOf(new DefaultExecutorServiceHandler().createExecutorService("test_job").getClass()));
-        assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, ElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
+        assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, ElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
                 instanceOf(DefaultJobExceptionHandler.class));
     }
     
@@ -76,7 +76,7 @@ public final class SimpleJobExecutorTest {
     public void assertExecuteWhenCheckMaxTimeDiffSecondsIntolerable() throws JobExecutionEnvironmentException {
         doThrow(JobExecutionEnvironmentException.class).when(jobFacade).checkJobExecutionEnvironment();
         try {
-            simpleJobExecutor.execute();
+            elasticJobExecutor.execute();
         } finally {
             verify(jobFacade).checkJobExecutionEnvironment();
             verify(jobCaller, times(0)).execute();
@@ -88,7 +88,7 @@ public final class SimpleJobExecutorTest {
         ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
         when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, 
                 "Previous job 'test_job' - shardingItems '[]' is still running, misfired job will start after previous job completed.");
@@ -102,7 +102,7 @@ public final class SimpleJobExecutorTest {
     public void assertExecuteWhenShardingItemsIsEmpty() throws JobExecutionEnvironmentException {
         ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "Sharding item for job 'test_job' is empty.");
         verify(jobFacade).checkJobExecutionEnvironment();
@@ -125,7 +125,7 @@ public final class SimpleJobExecutorTest {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
         doThrow(RuntimeException.class).when(jobCaller).execute();
         try {
-            simpleJobExecutor.execute();
+            elasticJobExecutor.execute();
         } finally {
             verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
             verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
@@ -158,7 +158,7 @@ public final class SimpleJobExecutorTest {
     
     private void assertExecuteWhenRunOnceSuccess(final ShardingContexts shardingContexts) {
         ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "");
         ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
@@ -170,7 +170,7 @@ public final class SimpleJobExecutorTest {
         ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
         when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
         verify(jobCaller, times(2)).execute();
     }
@@ -180,7 +180,7 @@ public final class SimpleJobExecutorTest {
         ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
         when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
         verify(jobCaller, times(2)).execute();
         verify(jobFacade, times(0)).clearMisfire(shardingContexts.getShardingItemParameters().keySet());
@@ -192,7 +192,7 @@ public final class SimpleJobExecutorTest {
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
         when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
         when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true, false);
-        simpleJobExecutor.execute();
+        elasticJobExecutor.execute();
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
         verify(jobFacade, times(2)).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
         verify(jobFacade).checkJobExecutionEnvironment();
@@ -210,7 +210,7 @@ public final class SimpleJobExecutorTest {
         when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
         doThrow(RuntimeException.class).when(jobFacade).beforeJobExecuted(shardingContexts);
         try {
-            simpleJobExecutor.execute();
+            elasticJobExecutor.execute();
         } finally {
             verify(jobCaller, times(0)).execute();
         }
@@ -224,7 +224,7 @@ public final class SimpleJobExecutorTest {
         when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
         doThrow(RuntimeException.class).when(jobFacade).afterJobExecuted(shardingContexts);
         try {
-            simpleJobExecutor.execute();
+            elasticJobExecutor.execute();
         } finally {
             verify(jobCaller, times(2)).execute();
         }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
index 49c812d..33a3000 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.executor.type;
 
 import org.apache.shardingsphere.elasticjob.lite.event.type.JobStatusTraceEvent.State;
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.fixture.config.TestSimpleJobConfiguration;
@@ -40,12 +41,12 @@ public final class WrongJobExecutorTest {
     @Mock
     private JobFacade jobFacade;
     
-    private SimpleJobExecutor wrongSimpleJobExecutor;
+    private ElasticJobExecutor wrongJobExecutor;
     
     @Before
     public void setUp() {
         when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
-        wrongSimpleJobExecutor = new SimpleJobExecutor(new TestWrongJob(), jobFacade);
+        wrongJobExecutor = new ElasticJobExecutor(new TestWrongJob(), jobFacade, new SimpleJobExecutor());
     }
     
     @Test(expected = RuntimeException.class)
@@ -54,7 +55,7 @@ public final class WrongJobExecutorTest {
         map.put(0, "A");
         ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
-        wrongSimpleJobExecutor.execute();
+        wrongJobExecutor.execute();
     }
     
     @Test
@@ -64,7 +65,7 @@ public final class WrongJobExecutorTest {
         map.put(1, "B");
         ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
         when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
-        wrongSimpleJobExecutor.execute();
+        wrongJobExecutor.execute();
         verify(jobFacade).getShardingContexts();
         verify(jobFacade).postJobStatusTraceEvent("fake_task_id", State.TASK_RUNNING, "");
     }