You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/03 06:55:25 UTC

[shardingsphere-elasticjob-lite] branch master updated: Decouple job type and facade class (#913)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new aea1d06  Decouple job type and facade class (#913)
aea1d06 is described below

commit aea1d06c598bd1683100687bf1860ba930046197
Author: Liang Zhang <te...@163.com>
AuthorDate: Fri Jul 3 14:55:13 2020 +0800

    Decouple job type and facade class (#913)
    
    * Decouple DataflowJobExecutor and LiteJobFacade.isEligibleForJobRunning
    
    * Remove LiteJobFacade.isEligibleForJobRunning
---
 .../elasticjob/lite/executor/JobFacade.java        | 12 ---------
 .../executor/type/impl/DataflowJobExecutor.java    | 10 +++++---
 .../lite/internal/schedule/LiteJobFacade.java      | 14 +---------
 .../type/impl/DataflowJobExecutorTest.java         |  6 +----
 .../lite/internal/schedule/LiteJobFacadeTest.java  | 30 +---------------------
 5 files changed, 10 insertions(+), 62 deletions(-)

diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
index e8cc76a..dd9c614 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/JobFacade.java
@@ -94,18 +94,6 @@ public interface JobFacade {
     boolean isExecuteMisfired(Collection<Integer> shardingItems);
     
     /**
-     * Judge job whether eligible running.
-     * 
-     * <p>The ineligible job includes:
-     * 1. Need to shutdown;
-     * 2. Need to resharding;
-     * 3. Not stream job.
-     * 
-     * @return job is eligible running or not
-     */
-    boolean isEligibleForJobRunning();
-    
-    /**
      * Judge job whether need resharding.
      *
      * @return whether need resharding
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
index 0f03ecb..cfbb5fa 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutor.java
@@ -35,23 +35,27 @@ public final class DataflowJobExecutor implements JobItemExecutor<DataflowJob> {
     @Override
     public void process(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
         if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(STREAM_PROCESS_KEY, false).toString())) {
-            streamingExecute(elasticJob, jobFacade, shardingContext);
+            streamingExecute(elasticJob, jobConfig, jobFacade, shardingContext);
         } else {
             oneOffExecute(elasticJob, shardingContext);
         }
     }
     
-    private void streamingExecute(final DataflowJob elasticJob, final JobFacade jobFacade, final ShardingContext shardingContext) {
+    private void streamingExecute(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
         List<Object> data = fetchData(elasticJob, shardingContext);
         while (null != data && !data.isEmpty()) {
             processData(elasticJob, shardingContext, data);
-            if (!jobFacade.isEligibleForJobRunning()) {
+            if (!isEligibleForJobRunning(jobConfig, jobFacade)) {
                 break;
             }
             data = fetchData(elasticJob, shardingContext);
         }
     }
     
+    private boolean isEligibleForJobRunning(final JobConfiguration jobConfig, final JobFacade jobFacade) {
+        return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobExecutor.STREAM_PROCESS_KEY, false).toString());
+    }
+    
     private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
         List<Object> data = fetchData(elasticJob, shardingContext);
         if (null != data && !data.isEmpty()) {
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
index 6948a9c..f3d6df9 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
@@ -19,14 +19,12 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.context.TaskContext;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobExecutionEnvironmentException;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
@@ -133,17 +131,7 @@ public final class LiteJobFacade implements JobFacade {
     
     @Override
     public boolean isExecuteMisfired(final Collection<Integer> shardingItems) {
-        return isEligibleForJobRunning() && configService.load(true).isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
-    }
-    
-    @Override
-    public boolean isEligibleForJobRunning() {
-        JobConfiguration jobConfig = configService.load(true);
-        if (JobType.DATAFLOW == jobConfig.getJobType()) {
-            return !shardingService.isNeedSharding()
-                    && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobExecutor.STREAM_PROCESS_KEY, false).toString());
-        }
-        return !shardingService.isNeedSharding();
+        return !isNeedSharding() && configService.load(true).isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
     }
     
     @Override
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
index bd374ce..89726cd 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/executor/type/impl/DataflowJobExecutorTest.java
@@ -103,7 +103,6 @@ public final class DataflowJobExecutorTest {
     public void assertExecuteWhenFetchDataIsNotEmptyForStreamingProcessAndSingleShardingItem() {
         setUp(true, ShardingContextsBuilder.getSingleShardingContexts());
         when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
-        when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
         elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller).processData(1);
@@ -115,7 +114,6 @@ public final class DataflowJobExecutorTest {
         setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
         when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
         when(jobCaller.fetchData(1)).thenReturn(Collections.singletonList(2), Collections.emptyList());
-        when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
         elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
         verify(jobCaller, times(2)).fetchData(1);
@@ -129,7 +127,6 @@ public final class DataflowJobExecutorTest {
         setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
         when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(2, 3), Collections.emptyList());
-        when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
         doThrow(new IllegalStateException()).when(jobCaller).processData(2);
         elasticJobExecutor.execute();
         verify(jobCaller, times(2)).fetchData(0);
@@ -143,7 +140,6 @@ public final class DataflowJobExecutorTest {
     @Test
     public void assertExecuteWhenFetchDataIsNotEmptyAndIsEligibleForJobRunningForStreamingProcess() {
         setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
-        when(jobFacade.isEligibleForJobRunning()).thenReturn(true);
         when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2), Collections.emptyList());
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4), Collections.emptyList());
         doThrow(new IllegalStateException()).when(jobCaller).processData(4);
@@ -159,7 +155,7 @@ public final class DataflowJobExecutorTest {
     @Test
     public void assertExecuteWhenFetchDataIsNotEmptyAndIsNotEligibleForJobRunningForStreamingProcess() {
         setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
-        when(jobFacade.isEligibleForJobRunning()).thenReturn(false);
+        when(jobFacade.isNeedSharding()).thenReturn(true);
         when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
         when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
         doThrow(new IllegalStateException()).when(jobCaller).processData(4);
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
index 9524b6a..a798f71 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
@@ -18,13 +18,12 @@
 package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 
 import com.google.common.collect.Lists;
-import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.ElasticJobListenerCaller;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.TestElasticJobListener;
+import org.apache.shardingsphere.elasticjob.lite.api.type.JobType;
 import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobExecutionEnvironmentException;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
@@ -210,33 +209,6 @@ public final class LiteJobFacadeTest {
     }
     
     @Test
-    public void assertNotEligibleForJobRunningWhenNeedSharding() {
-        when(configService.load(true)).thenReturn(
-                JobConfiguration.newBuilder("test_job", JobType.SIMPLE, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build());
-        when(shardingService.isNeedSharding()).thenReturn(true);
-        assertThat(liteJobFacade.isEligibleForJobRunning(), is(false));
-        verify(shardingService).isNeedSharding();
-    }
-    
-    @Test
-    public void assertNotEligibleForJobRunningWhenUnStreamingProcess() {
-        when(configService.load(true)).thenReturn(
-                JobConfiguration.newBuilder("test_job", JobType.DATAFLOW, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.FALSE.toString()).build());
-        assertThat(liteJobFacade.isEligibleForJobRunning(), is(false));
-        verify(configService).load(true);
-    }
-    
-    @Test
-    public void assertEligibleForJobRunningWhenNotNeedShardingAndStreamingProcess() {
-        when(shardingService.isNeedSharding()).thenReturn(false);
-        when(configService.load(true)).thenReturn(
-                JobConfiguration.newBuilder("test_job", JobType.DATAFLOW, 3).cron("0/1 * * * * ?").setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build());
-        assertThat(liteJobFacade.isEligibleForJobRunning(), is(true));
-        verify(shardingService).isNeedSharding();
-        verify(configService).load(true);
-    }
-    
-    @Test
     public void assertPostJobExecutionEvent() {
         liteJobFacade.postJobExecutionEvent(null);
         verify(jobEventBus).post(null);