You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/03 06:55:25 UTC
[shardingsphere-elasticjob-lite] branch master updated: Decouple
job type and facade class (#913)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new aea1d06 Decouple job type and facade class (#913)
aea1d06 is described below
commit aea1d06c598bd1683100687bf1860ba930046197
Author: Liang Zhang <te...@163.com>
AuthorDate: Fri Jul 3 14:55:13 2020 +0800
Decouple job type and facade class (#913)
* Decouple DataflowJobExecutor and LiteJobFacade.isEligibleForJobRunning
* Remove LiteJobFacade.isEligibleForJobRunning
---
.../elasticjob/lite/executor/JobFacade.java | 12 ---------
.../executor/type/impl/DataflowJobExecutor.java | 10 +++++---
.../lite/internal/schedule/LiteJobFacade.java | 14 +---------
.../type/impl/DataflowJobExecutorTest.java | 6 +----
.../lite/internal/schedule/LiteJobFacadeTest.java | 30 +---------------------
5 files changed, 10 insertions(+), 62 deletions(-)
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
index e8cc76a..dd9c614 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
@@ -94,18 +94,6 @@ public interface JobFacade {
boolean isExecuteMisfired(Collection<Integer> shardingItems);
/**
- * Judge job whether eligible running.
- *
- * <p>The ineligible job includes:
- * 1. Need to shutdown;
- * 2. Need to resharding;
- * 3. Not stream job.
- *
- * @return job is eligible running or not
- */
- boolean isEligibleForJobRunning();
-
- /**
* Judge job whether need resharding.
*
* @return whether need resharding
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
index 0f03ecb..cfbb5fa 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
@@ -35,23 +35,27 @@ public final class DataflowJobExecutor implements JobItemExecutor<DataflowJob> {
@Override
public void process(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(STREAM_PROCESS_KEY, false).toString())) {
- streamingExecute(elasticJob, jobFacade, shardingContext);
+ streamingExecute(elasticJob, jobConfig, jobFacade, shardingContext);
} else {
oneOffExecute(elasticJob, shardingContext);
}
}
- private void streamingExecute(final DataflowJob elasticJob, final JobFacade jobFacade, final ShardingContext shardingContext) {
+ private void streamingExecute(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
List<Object> data = fetchData(elasticJob, shardingContext);
while (null != data && !data.isEmpty()) {
processData(elasticJob, shardingContext, data);
- if (!jobFacade.isEligibleForJobRunning()) {
+ if (!isEligibleForJobRunning(jobConfig, jobFacade)) {
break;
}
data = fetchData(elasticJob, shardingContext);
}
}
+ private boolean isEligibleForJobRunning(final JobConfiguration jobConfig, final JobFacade jobFacade) {
+ return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobExecutor.STREAM_PROCESS_KEY, false).toString());
+ }
+
private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
List<Object> data = fetchData(elasticJob, shardingContext);
if (null != data && !data.isEmpty()) {
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
index 6948a9c..f3d6df9 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
@@ -19,14 +19,12 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.context.TaskContext;
import org.apache.shardingsphere.elasticjob.lite.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
@@ -133,17 +131,7 @@ public final class LiteJobFacade implements JobFacade {
@Override
public boolean isExecuteMisfired(final Collection<Integer> shardingItems) {
- return isEligibleForJobRunning() && configService.load(true).isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
- }
-
- @Override
- public boolean isEligibleForJobRunning() {
- JobConfiguration jobConfig = configService.load(true);
- if (JobType.DATAFLOW == jobConfig.getJobType()) {
- return !shardingService.isNeedSharding()
- && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobExecutor.STREAM_PROCESS_KEY, false).toString());
- }
- return !shardingService.isNeedSharding();
+ return !isNeedSharding() && configService.load(true).isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
}
@Override
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
index bd374ce..89726cd 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
@@ -103,7 +103,6 @@ public final class DataflowJobExecutorTest {
public void assertExecuteWhenFetchDataIsNotEmptyForStreamingProcessAndSingleShardingItem() {
setUp(true, ShardingContextsBuilder.getSingleShardingContexts());
when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
- when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller).processData(1);
@@ -115,7 +114,6 @@ public final class DataflowJobExecutorTest {
setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
when(jobCaller.fetchData(1)).thenReturn(Collections.singletonList(2), Collections.emptyList());
- when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
verify(jobCaller, times(2)).fetchData(1);
@@ -129,7 +127,6 @@ public final class DataflowJobExecutorTest {
setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(2, 3), Collections.emptyList());
- when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
doThrow(new IllegalStateException()).when(jobCaller).processData(2);
elasticJobExecutor.execute();
verify(jobCaller, times(2)).fetchData(0);
@@ -143,7 +140,6 @@ public final class DataflowJobExecutorTest {
@Test
public void assertExecuteWhenFetchDataIsNotEmptyAndIsEligibleForJobRunningForStreamingProcess() {
setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2), Collections.emptyList());
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4), Collections.emptyList());
doThrow(new IllegalStateException()).when(jobCaller).processData(4);
@@ -159,7 +155,7 @@ public final class DataflowJobExecutorTest {
@Test
public void assertExecuteWhenFetchDataIsNotEmptyAndIsNotEligibleForJobRunningForStreamingProcess() {
setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobFacade.isEligibleForJobRunning()).thenReturn(false);
+ when(jobFacade.isNeedSharding()).thenReturn(true);
when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
doThrow(new IllegalStateException()).when(jobCaller).processData(4);
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
index 9524b6a..a798f71 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
@@ -18,13 +18,12 @@
package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
import com.google.common.collect.Lists;
-import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.ElasticJobListenerCaller;
import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.TestElasticJobListener;
+import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
@@ -210,33 +209,6 @@ public final class LiteJobFacadeTest {
}
@Test
- public void assertNotEligibleForJobRunningWhenNeedSharding() {
- when(configService.load(true)).thenReturn(
- JobConfiguration.newBuilder("test_job", JobType.SIMPLE, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build());
- when(shardingService.isNeedSharding()).thenReturn(true);
- assertThat(liteJobFacade.isEligibleForJobRunning(), is(false));
- verify(shardingService).isNeedSharding();
- }
-
- @Test
- public void assertNotEligibleForJobRunningWhenUnStreamingProcess() {
- when(configService.load(true)).thenReturn(
- JobConfiguration.newBuilder("test_job", JobType.DATAFLOW, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.FALSE.toString()).build());
- assertThat(liteJobFacade.isEligibleForJobRunning(), is(false));
- verify(configService).load(true);
- }
-
- @Test
- public void assertEligibleForJobRunningWhenNotNeedShardingAndStreamingProcess() {
- when(shardingService.isNeedSharding()).thenReturn(false);
- when(configService.load(true)).thenReturn(
- JobConfiguration.newBuilder("test_job", JobType.DATAFLOW, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build());
- assertThat(liteJobFacade.isEligibleForJobRunning(), is(true));
- verify(shardingService).isNeedSharding();
- verify(configService).load(true);
- }
-
- @Test
public void assertPostJobExecutionEvent() {
liteJobFacade.postJobExecutionEvent(null);
verify(jobEventBus).post(null);