You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/20 06:50:06 UTC
[shardingsphere-elasticjob-lite] branch master updated: Decouple
JobItemExecutor and JobExecutor (#796)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new da3fda5 Decouple JobItemExecutor and JobExecutor (#796)
da3fda5 is described below
commit da3fda56a0e51b123e9a0a915f9c780a2626b7e0
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Jun 20 14:49:59 2020 +0800
Decouple JobItemExecutor and JobExecutor (#796)
* Avoid inner class import
* Use JobItemExecutor to decouple job executor
* refactor JobItemExecutor
---
.../lite/executor/ElasticJobExecutor.java | 23 ++++++-----
.../lite/executor/JobExecutorFactory.java | 12 ++++--
.../lite/executor/type/DataflowJobExecutor.java | 45 ++++++++++------------
...SimpleJobExecutor.java => JobItemExecutor.java} | 30 +++++++--------
.../lite/executor/type/ScriptJobExecutor.java | 13 +++----
.../lite/executor/type/SimpleJobExecutor.java | 15 ++------
.../lite/executor/JobExecutorFactoryTest.java | 17 ++++++--
.../executor/type/DataflowJobExecutorTest.java | 21 +++++-----
.../lite/executor/type/ScriptJobExecutorTest.java | 15 ++++----
.../lite/executor/type/SimpleJobExecutorTest.java | 30 +++++++--------
.../lite/executor/type/WrongJobExecutorTest.java | 9 +++--
11 files changed, 119 insertions(+), 111 deletions(-)
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
index a2f23b5..4464aa2 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java
@@ -17,9 +17,8 @@
package org.apache.shardingsphere.elasticjob.lite.executor;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.lite.event.type.JobExecutionEvent;
@@ -32,6 +31,7 @@ import org.apache.shardingsphere.elasticjob.lite.executor.handler.ExecutorServic
import org.apache.shardingsphere.elasticjob.lite.executor.handler.ExecutorServiceHandlerRegistry;
import org.apache.shardingsphere.elasticjob.lite.executor.handler.JobExceptionHandler;
import org.apache.shardingsphere.elasticjob.lite.executor.handler.JobProperties.JobPropertiesEnum;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
import java.util.Collection;
import java.util.Map;
@@ -43,12 +43,12 @@ import java.util.concurrent.ExecutorService;
* ElasticJob executor.
*/
@Slf4j
-public abstract class ElasticJobExecutor {
+public final class ElasticJobExecutor {
+
+ private final ElasticJob elasticJob;
- @Getter(AccessLevel.PROTECTED)
private final JobFacade jobFacade;
- @Getter(AccessLevel.PROTECTED)
private final JobRootConfiguration jobRootConfig;
private final String jobName;
@@ -59,13 +59,17 @@ public abstract class ElasticJobExecutor {
private final Map<Integer, String> itemErrorMessages;
- protected ElasticJobExecutor(final JobFacade jobFacade) {
+ private final JobItemExecutor jobItemExecutor;
+
+ public ElasticJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
+ this.elasticJob = elasticJob;
this.jobFacade = jobFacade;
jobRootConfig = jobFacade.loadJobRootConfiguration(true);
jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
jobExceptionHandler = (JobExceptionHandler) getHandler(JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
+ this.jobItemExecutor = jobItemExecutor;
}
private Object getHandler(final JobPropertiesEnum jobPropertiesEnum) {
@@ -93,7 +97,7 @@ public abstract class ElasticJobExecutor {
/**
* Execute job.
*/
- public final void execute() {
+ public void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
@@ -191,6 +195,7 @@ public abstract class ElasticJobExecutor {
}
}
+ @SuppressWarnings("unchecked")
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
@@ -198,7 +203,7 @@ public abstract class ElasticJobExecutor {
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
- process(new ShardingContext(shardingContexts, item));
+ jobItemExecutor.process(elasticJob, jobRootConfig, jobFacade, new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
@@ -213,6 +218,4 @@ public abstract class ElasticJobExecutor {
jobExceptionHandler.handleException(jobName, cause);
}
}
-
- protected abstract void process(ShardingContext shardingContext);
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
index 9f9a16e..06d7ae1 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactory.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.lite.api.dataflow.DataflowJob;
import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.lite.executor.type.DataflowJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.type.ScriptJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.type.SimpleJobExecutor;
@@ -42,14 +43,19 @@ public final class JobExecutorFactory {
*/
@SuppressWarnings("unchecked")
public static ElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
+ return new ElasticJobExecutor(elasticJob, jobFacade, getJobItemExecutor(elasticJob));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static JobItemExecutor getJobItemExecutor(final ElasticJob elasticJob) {
if (null == elasticJob) {
- return new ScriptJobExecutor(jobFacade);
+ return new ScriptJobExecutor();
}
if (elasticJob instanceof SimpleJob) {
- return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
+ return new SimpleJobExecutor();
}
if (elasticJob instanceof DataflowJob) {
- return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
+ return new DataflowJobExecutor();
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
index 7fa27d1..05a7258 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutor.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.elasticjob.lite.executor.type;
import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.lite.api.dataflow.DataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.lite.config.dataflow.DataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import java.util.List;
@@ -28,48 +28,43 @@ import java.util.List;
/**
* Dataflow job executor.
*/
-public final class DataflowJobExecutor extends ElasticJobExecutor {
-
- private final DataflowJob<Object> dataflowJob;
-
- public DataflowJobExecutor(final DataflowJob<Object> dataflowJob, final JobFacade jobFacade) {
- super(jobFacade);
- this.dataflowJob = dataflowJob;
- }
+public final class DataflowJobExecutor implements JobItemExecutor<DataflowJob> {
@Override
- protected void process(final ShardingContext shardingContext) {
- DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig();
+ public void process(final DataflowJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+ DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) jobRootConfig.getTypeConfig();
if (dataflowConfig.isStreamingProcess()) {
- streamingExecute(shardingContext);
+ streamingExecute(elasticJob, jobFacade, shardingContext);
} else {
- oneOffExecute(shardingContext);
+ oneOffExecute(elasticJob, shardingContext);
}
}
- private void streamingExecute(final ShardingContext shardingContext) {
- List<Object> data = fetchData(shardingContext);
+ private void streamingExecute(final DataflowJob elasticJob, final JobFacade jobFacade, final ShardingContext shardingContext) {
+ List<Object> data = fetchData(elasticJob, shardingContext);
while (null != data && !data.isEmpty()) {
- processData(shardingContext, data);
- if (!getJobFacade().isEligibleForJobRunning()) {
+ processData(elasticJob, shardingContext, data);
+ if (!jobFacade.isEligibleForJobRunning()) {
break;
}
- data = fetchData(shardingContext);
+ data = fetchData(elasticJob, shardingContext);
}
}
- private void oneOffExecute(final ShardingContext shardingContext) {
- List<Object> data = fetchData(shardingContext);
+ private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
+ List<Object> data = fetchData(elasticJob, shardingContext);
if (null != data && !data.isEmpty()) {
- processData(shardingContext, data);
+ processData(elasticJob, shardingContext, data);
}
}
- private List<Object> fetchData(final ShardingContext shardingContext) {
- return dataflowJob.fetchData(shardingContext);
+ @SuppressWarnings("unchecked")
+ private List<Object> fetchData(final DataflowJob elasticJob, final ShardingContext shardingContext) {
+ return elasticJob.fetchData(shardingContext);
}
- private void processData(final ShardingContext shardingContext, final List<Object> data) {
- dataflowJob.processData(shardingContext, data);
+ @SuppressWarnings("unchecked")
+ private void processData(final DataflowJob elasticJob, final ShardingContext shardingContext, final List<Object> data) {
+ elasticJob.processData(shardingContext, data);
}
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
similarity index 63%
copy from elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
copy to elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
index dc4f815..e746d9d 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/JobItemExecutor.java
@@ -17,25 +17,25 @@
package org.apache.shardingsphere.elasticjob.lite.executor.type;
+import org.apache.shardingsphere.elasticjob.lite.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
/**
- * Simple job executor.
+ * Job item executor.
+ *
+ * @param <T> type of ElasticJob
*/
-public final class SimpleJobExecutor extends ElasticJobExecutor {
+public interface JobItemExecutor<T extends ElasticJob> {
- private final SimpleJob simpleJob;
-
- public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
- super(jobFacade);
- this.simpleJob = simpleJob;
- }
-
- @Override
- protected void process(final ShardingContext shardingContext) {
- simpleJob.execute(shardingContext);
- }
+ /**
+ * Process job item.
+ *
+ * @param elasticJob elastic job
+ * @param jobRootConfig job root configuration
+ * @param jobFacade job facade
+ * @param shardingContext sharding context
+ */
+ void process(T elasticJob, JobRootConfiguration jobRootConfig, JobFacade jobFacade, ShardingContext shardingContext);
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
index 0cb5f01..6e7cbfd 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutor.java
@@ -21,9 +21,10 @@ import com.google.common.base.Strings;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.lite.api.script.ScriptJob;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.lite.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.lite.util.json.GsonFactory;
@@ -32,15 +33,11 @@ import java.io.IOException;
/**
* Script job executor.
*/
-public final class ScriptJobExecutor extends ElasticJobExecutor {
-
- public ScriptJobExecutor(final JobFacade jobFacade) {
- super(jobFacade);
- }
+public final class ScriptJobExecutor implements JobItemExecutor<ScriptJob> {
@Override
- protected void process(final ShardingContext shardingContext) {
- final String scriptCommandLine = ((ScriptJobConfiguration) getJobRootConfig().getTypeConfig()).getScriptCommandLine();
+ public void process(final ScriptJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+ String scriptCommandLine = ((ScriptJobConfiguration) jobRootConfig.getTypeConfig()).getScriptCommandLine();
if (Strings.isNullOrEmpty(scriptCommandLine)) {
throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", shardingContext.getJobName());
}
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
index dc4f815..f7321b5 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutor.java
@@ -19,23 +19,16 @@ package org.apache.shardingsphere.elasticjob.lite.executor.type;
import org.apache.shardingsphere.elasticjob.lite.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.lite.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
/**
* Simple job executor.
*/
-public final class SimpleJobExecutor extends ElasticJobExecutor {
-
- private final SimpleJob simpleJob;
-
- public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
- super(jobFacade);
- this.simpleJob = simpleJob;
- }
+public final class SimpleJobExecutor implements JobItemExecutor<SimpleJob> {
@Override
- protected void process(final ShardingContext shardingContext) {
- simpleJob.execute(shardingContext);
+ public void process(final SimpleJob elasticJob, final JobRootConfiguration jobRootConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
+ elasticJob.execute(shardingContext);
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
index ca4c311..51f27d2 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/JobExecutorFactoryTest.java
@@ -17,8 +17,10 @@
package org.apache.shardingsphere.elasticjob.lite.executor;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.lite.executor.type.DataflowJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.executor.type.JobItemExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.type.ScriptJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.type.SimpleJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.fixture.config.TestDataflowJobConfiguration;
@@ -33,6 +35,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -47,19 +51,19 @@ public final class JobExecutorFactoryTest {
@Test
public void assertGetJobExecutorForScriptJob() {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh", IgnoreJobExceptionHandler.class));
- assertThat(JobExecutorFactory.getJobExecutor(null, jobFacade), instanceOf(ScriptJobExecutor.class));
+ assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(null, jobFacade)), instanceOf(ScriptJobExecutor.class));
}
@Test
public void assertGetJobExecutorForSimpleJob() {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
- assertThat(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade), instanceOf(SimpleJobExecutor.class));
+ assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade)), instanceOf(SimpleJobExecutor.class));
}
@Test
public void assertGetJobExecutorForDataflowJob() {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false));
- assertThat(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade), instanceOf(DataflowJobExecutor.class));
+ assertThat(getJobItemExecutor(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade)), instanceOf(DataflowJobExecutor.class));
}
@Test(expected = JobConfigurationException.class)
@@ -74,4 +78,11 @@ public final class JobExecutorFactoryTest {
ElasticJobExecutor anotherExecutor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
assertTrue(executor.hashCode() != anotherExecutor.hashCode());
}
+
+ @SneakyThrows
+ private JobItemExecutor getJobItemExecutor(final ElasticJobExecutor elasticJobExecutor) {
+ Field field = ElasticJobExecutor.class.getDeclaredField("jobItemExecutor");
+ field.setAccessible(true);
+ return (JobItemExecutor) field.get(elasticJobExecutor);
+ }
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
index 46a0b55..a5a8af2 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/DataflowJobExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.executor.type;
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
@@ -49,7 +50,7 @@ public final class DataflowJobExecutorTest {
private ShardingContexts shardingContexts;
- private DataflowJobExecutor dataflowJobExecutor;
+ private ElasticJobExecutor elasticJobExecutor;
@After
public void tearDown() {
@@ -62,7 +63,7 @@ public final class DataflowJobExecutorTest {
setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
when(jobCaller.fetchData(0)).thenReturn(null);
when(jobCaller.fetchData(1)).thenReturn(Collections.emptyList());
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller).fetchData(0);
verify(jobCaller).fetchData(1);
verify(jobCaller, times(0)).processData(any());
@@ -72,7 +73,7 @@ public final class DataflowJobExecutorTest {
public void assertExecuteWhenFetchDataIsNotEmptyForUnStreamingProcessAndSingleShardingItem() {
setUp(false, ShardingContextsBuilder.getSingleShardingContexts());
doThrow(new IllegalStateException()).when(jobCaller).fetchData(0);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller).fetchData(0);
verify(jobCaller, times(0)).processData(any());
}
@@ -83,7 +84,7 @@ public final class DataflowJobExecutorTest {
when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller).fetchData(0);
verify(jobCaller).fetchData(1);
verify(jobCaller).processData(1);
@@ -98,7 +99,7 @@ public final class DataflowJobExecutorTest {
setUp(true, ShardingContextsBuilder.getSingleShardingContexts());
when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller).processData(1);
}
@@ -110,7 +111,7 @@ public final class DataflowJobExecutorTest {
when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
when(jobCaller.fetchData(1)).thenReturn(Collections.singletonList(2), Collections.emptyList());
when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller, times(2)).fetchData(1);
verify(jobCaller).processData(1);
@@ -125,7 +126,7 @@ public final class DataflowJobExecutorTest {
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(2, 3), Collections.emptyList());
when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
doThrow(new IllegalStateException()).when(jobCaller).processData(2);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller, times(1)).fetchData(1);
verify(jobCaller).processData(1);
@@ -141,7 +142,7 @@ public final class DataflowJobExecutorTest {
when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2), Collections.emptyList());
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4), Collections.emptyList());
doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller, times(1)).fetchData(1);
verify(jobCaller).processData(1);
@@ -157,7 +158,7 @@ public final class DataflowJobExecutorTest {
when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobCaller).fetchData(0);
verify(jobCaller).fetchData(1);
verify(jobCaller).processData(1);
@@ -170,7 +171,7 @@ public final class DataflowJobExecutorTest {
this.shardingContexts = shardingContexts;
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(isStreamingProcess));
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- dataflowJobExecutor = new DataflowJobExecutor(new TestDataflowJob(jobCaller), jobFacade);
+ elasticJobExecutor = new ElasticJobExecutor(new TestDataflowJob(jobCaller), jobFacade, new DataflowJobExecutor());
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
index 7704856..2ef8285 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/ScriptJobExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.executor.type;
import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.fixture.ShardingContextsBuilder;
@@ -38,14 +39,14 @@ public final class ScriptJobExecutorTest {
@Mock
private JobFacade jobFacade;
- private ScriptJobExecutor scriptJobExecutor;
+ private ElasticJobExecutor elasticJobExecutor;
@Test
public void assertExecuteWhenCommandLineIsEmpty() {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, ShardingContextsBuilder.getMultipleShardingContexts());
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("", IgnoreJobExceptionHandler.class));
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
+ elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+ elasticJobExecutor.execute();
}
@Test(expected = JobSystemException.class)
@@ -61,8 +62,8 @@ public final class ScriptJobExecutorTest {
private void assertExecuteWhenExecuteFailure(final ShardingContexts shardingContexts) {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("not_exists_file", ThrowJobExceptionHandler.class));
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
+ elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+ elasticJobExecutor.execute();
}
@Test
@@ -78,8 +79,8 @@ public final class ScriptJobExecutorTest {
private void assertExecuteSuccess(final ShardingContexts shardingContexts) {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("exists_file param0 param1", IgnoreJobExceptionHandler.class));
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
+ elasticJobExecutor = new ElasticJobExecutor(null, jobFacade, new ScriptJobExecutor());
+ elasticJobExecutor.execute();
verify(jobFacade).loadJobRootConfiguration(true);
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
index be0816a..1f02ca5 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/SimpleJobExecutorTest.java
@@ -54,21 +54,21 @@ public final class SimpleJobExecutorTest {
@Mock
private JobFacade jobFacade;
- private SimpleJobExecutor simpleJobExecutor;
+ private ElasticJobExecutor elasticJobExecutor;
@Before
public void setUp() {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
- simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
+ elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade, new SimpleJobExecutor());
}
@Test
public void assertNewExecutorWithDefaultHandlers() throws NoSuchFieldException {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration("ErrorHandler", Object.class.getName()));
- SimpleJobExecutor simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
- assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, ElasticJobExecutor.class.getDeclaredField("executorService")),
+ elasticJobExecutor = new ElasticJobExecutor(new TestSimpleJob(jobCaller), jobFacade, new SimpleJobExecutor());
+ assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, ElasticJobExecutor.class.getDeclaredField("executorService")),
instanceOf(new DefaultExecutorServiceHandler().createExecutorService("test_job").getClass()));
- assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, ElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
+ assertThat(ReflectionUtils.getFieldValue(elasticJobExecutor, ElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
instanceOf(DefaultJobExceptionHandler.class));
}
@@ -76,7 +76,7 @@ public final class SimpleJobExecutorTest {
public void assertExecuteWhenCheckMaxTimeDiffSecondsIntolerable() throws JobExecutionEnvironmentException {
doThrow(JobExecutionEnvironmentException.class).when(jobFacade).checkJobExecutionEnvironment();
try {
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
} finally {
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobCaller, times(0)).execute();
@@ -88,7 +88,7 @@ public final class SimpleJobExecutorTest {
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED,
"Previous job 'test_job' - shardingItems '[]' is still running, misfired job will start after previous job completed.");
@@ -102,7 +102,7 @@ public final class SimpleJobExecutorTest {
public void assertExecuteWhenShardingItemsIsEmpty() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "Sharding item for job 'test_job' is empty.");
verify(jobFacade).checkJobExecutionEnvironment();
@@ -125,7 +125,7 @@ public final class SimpleJobExecutorTest {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
doThrow(RuntimeException.class).when(jobCaller).execute();
try {
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
} finally {
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
@@ -158,7 +158,7 @@ public final class SimpleJobExecutorTest {
private void assertExecuteWhenRunOnceSuccess(final ShardingContexts shardingContexts) {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "");
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
@@ -170,7 +170,7 @@ public final class SimpleJobExecutorTest {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
verify(jobCaller, times(2)).execute();
}
@@ -180,7 +180,7 @@ public final class SimpleJobExecutorTest {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
verify(jobCaller, times(2)).execute();
verify(jobFacade, times(0)).clearMisfire(shardingContexts.getShardingItemParameters().keySet());
@@ -192,7 +192,7 @@ public final class SimpleJobExecutorTest {
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true, false);
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade, times(2)).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
verify(jobFacade).checkJobExecutionEnvironment();
@@ -210,7 +210,7 @@ public final class SimpleJobExecutorTest {
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
doThrow(RuntimeException.class).when(jobFacade).beforeJobExecuted(shardingContexts);
try {
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
} finally {
verify(jobCaller, times(0)).execute();
}
@@ -224,7 +224,7 @@ public final class SimpleJobExecutorTest {
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
doThrow(RuntimeException.class).when(jobFacade).afterJobExecuted(shardingContexts);
try {
- simpleJobExecutor.execute();
+ elasticJobExecutor.execute();
} finally {
verify(jobCaller, times(2)).execute();
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
index 49c812d..33a3000 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/WrongJobExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.executor.type;
import org.apache.shardingsphere.elasticjob.lite.event.type.JobStatusTraceEvent.State;
+import org.apache.shardingsphere.elasticjob.lite.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.fixture.config.TestSimpleJobConfiguration;
@@ -40,12 +41,12 @@ public final class WrongJobExecutorTest {
@Mock
private JobFacade jobFacade;
- private SimpleJobExecutor wrongSimpleJobExecutor;
+ private ElasticJobExecutor wrongJobExecutor;
@Before
public void setUp() {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
- wrongSimpleJobExecutor = new SimpleJobExecutor(new TestWrongJob(), jobFacade);
+ wrongJobExecutor = new ElasticJobExecutor(new TestWrongJob(), jobFacade, new SimpleJobExecutor());
}
@Test(expected = RuntimeException.class)
@@ -54,7 +55,7 @@ public final class WrongJobExecutorTest {
map.put(0, "A");
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- wrongSimpleJobExecutor.execute();
+ wrongJobExecutor.execute();
}
@Test
@@ -64,7 +65,7 @@ public final class WrongJobExecutorTest {
map.put(1, "B");
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- wrongSimpleJobExecutor.execute();
+ wrongJobExecutor.execute();
verify(jobFacade).getShardingContexts();
verify(jobFacade).postJobStatusTraceEvent("fake_task_id", State.TASK_RUNNING, "");
}