You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/06 04:22:26 UTC

[shardingsphere-elasticjob-lite] branch master updated: Fix the job can not enable for disabled job integrate test sometimes #953 (#959)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new d410d4f   Fix the job can not enable for disabled job integrate test sometimes #953 (#959)
d410d4f is described below

commit d410d4f5c2da1fec796344a1c2bd2363b6a34b47
Author: keker <as...@163.com>
AuthorDate: Mon Jul 6 12:22:19 2020 +0800

     Fix the job can not enable for disabled job integrate test sometimes #953 (#959)
---
 .../AbstractDistributeOnceElasticJobListener.java  | 14 +++++++--
 .../lite/internal/guarantee/GuaranteeService.java  | 34 ++++++++++++++++++++--
 .../DistributeOnceElasticJobListenerTest.java      |  6 ++++
 .../disable/ScheduleDisabledJobIntegrateTest.java  | 11 ++++---
 .../internal/guarantee/GuaranteeServiceTest.java   | 24 +++++++++++++++
 5 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
index 542047d..3c80b98 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.elasticjob.lite.api.listener;
 
+import java.util.Set;
 import lombok.Setter;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
 import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
+import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
 import org.apache.shardingsphere.elasticjob.lite.util.env.TimeService;
 
 /**
@@ -56,7 +58,11 @@ public abstract class AbstractDistributeOnceElasticJobListener implements Elasti
     
     @Override
     public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
-        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
+        Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
+        guaranteeService.registerStart(shardingItems);
+        while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
+            BlockUtils.waitingShortTime();
+        }
         if (guaranteeService.isAllStarted()) {
             doBeforeJobExecutedAtLastStarted(shardingContexts);
             guaranteeService.clearAllStartedInfo();
@@ -78,7 +84,11 @@ public abstract class AbstractDistributeOnceElasticJobListener implements Elasti
     
     @Override
     public final void afterJobExecuted(final ShardingContexts shardingContexts) {
-        guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());
+        Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
+        guaranteeService.registerComplete(shardingItems);
+        while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) {
+            BlockUtils.waitingShortTime();
+        }
         if (guaranteeService.isAllCompleted()) {
             doAfterJobExecutedAtLastCompleted(shardingContexts);
             guaranteeService.clearAllCompletedInfo();
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
index e6db406..d8f0da3 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
@@ -39,8 +39,8 @@ public final class GuaranteeService {
     
     /**
      * Register start.
-     * 
-     * @param shardingItems to be registered sharding items 
+     *
+     * @param shardingItems to be registered sharding items
      */
     public void registerStart(final Collection<Integer> shardingItems) {
         for (int each : shardingItems) {
@@ -49,6 +49,21 @@ public final class GuaranteeService {
     }
     
     /**
+     * Judge whether current sharding items are all register start success.
+     *
+     * @param shardingItems current sharding items
+     * @return current sharding items are all start success or not
+     */
+    public boolean isRegisterStartSuccess(final Collection<Integer> shardingItems) {
+        for (int each : shardingItems) {
+            if (!jobNodeStorage.isJobNodeExisted(GuaranteeNode.getStartedNode(each))) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    /**
      * Judge whether job's sharding items are all started.
      *
      * @return job's sharding items are all started or not
@@ -77,6 +92,21 @@ public final class GuaranteeService {
     }
     
     /**
+     * Judge whether sharding items are register complete success.
+     *
+     * @param shardingItems current sharding items
+     * @return current sharding items are all complete success or not
+     */
+    public boolean isRegisterCompleteSuccess(final Collection<Integer> shardingItems) {
+        for (int each : shardingItems) {
+            if (!jobNodeStorage.isJobNodeExisted(GuaranteeNode.getCompletedNode(each))) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    /**
      * Judge whether job's sharding items are all completed.
      *
      * @return job's sharding items are all completed or not
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
index e85137b..1465643 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
@@ -68,6 +68,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test
     public void assertBeforeJobExecutedWhenIsAllStarted() {
+        when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllStarted()).thenReturn(true);
         distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
         verify(guaranteeService).registerStart(Sets.newHashSet(0, 1));
@@ -77,6 +78,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test
     public void assertBeforeJobExecutedWhenIsNotAllStartedAndNotTimeout() {
+        when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllStarted()).thenReturn(false);
         when(timeService.getCurrentMillis()).thenReturn(0L);
         distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
@@ -86,6 +88,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test(expected = JobSystemException.class)
     public void assertBeforeJobExecutedWhenIsNotAllStartedAndTimeout() {
+        when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllStarted()).thenReturn(false);
         when(timeService.getCurrentMillis()).thenReturn(0L, 2L);
         distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
@@ -95,6 +98,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test
     public void assertAfterJobExecutedWhenIsAllCompleted() {
+        when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllCompleted()).thenReturn(true);
         distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
         verify(guaranteeService).registerComplete(Sets.newHashSet(0, 1));
@@ -104,6 +108,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test
     public void assertAfterJobExecutedWhenIsAllCompletedAndNotTimeout() {
+        when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllCompleted()).thenReturn(false);
         when(timeService.getCurrentMillis()).thenReturn(0L);
         distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
@@ -113,6 +118,7 @@ public final class DistributeOnceElasticJobListenerTest {
     
     @Test(expected = JobSystemException.class)
     public void assertAfterJobExecutedWhenIsAllCompletedAndTimeout() {
+        when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
         when(guaranteeService.isAllCompleted()).thenReturn(false);
         when(timeService.getCurrentMillis()).thenReturn(0L, 2L);
         distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
index df599be..d7ab21e 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.integrate.disable;
 
 import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.fixture.job.DetailedFooJob;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerStatus;
 import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
@@ -41,10 +42,9 @@ public final class ScheduleDisabledJobIntegrateTest extends DisabledJobIntegrate
         BlockUtils.waitingShortTime();
         assertDisabledRegCenterInfo();
         setJobEnable();
-        // TODO the job can not enable sometimes 
-//        while (!((DetailedFooJob) getElasticJob()).isCompleted()) {
-//            BlockUtils.waitingShortTime();
-//        }
+        while (!((DetailedFooJob) getElasticJob()).isCompleted()) {
+            BlockUtils.waitingShortTime();
+        }
         assertEnabledRegCenterInfo();
     }
     
@@ -55,7 +55,6 @@ public final class ScheduleDisabledJobIntegrateTest extends DisabledJobIntegrate
     private void assertEnabledRegCenterInfo() {
         assertTrue(getRegCenter().isExisted("/" + getJobName() + "/instances/" + JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
         getRegCenter().remove("/" + getJobName() + "/leader/election");
-        // TODO the job can not enable sometimes, so can not assert if job not schedule
-//        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
+        assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
     }
 }
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
index fdfb4ad..f82b89f 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
@@ -59,6 +59,18 @@ public final class GuaranteeServiceTest {
     }
     
     @Test
+    public void assertIsNotRegisterStartSuccess() {
+        assertFalse(guaranteeService.isRegisterStartSuccess(Arrays.asList(0, 1)));
+    }
+    
+    @Test
+    public void assertIsRegisterStartSuccess() {
+        when(jobNodeStorage.isJobNodeExisted("guarantee/started/0")).thenReturn(true);
+        when(jobNodeStorage.isJobNodeExisted("guarantee/started/1")).thenReturn(true);
+        assertTrue(guaranteeService.isRegisterStartSuccess(Arrays.asList(0, 1)));
+    }
+    
+    @Test
     public void assertIsNotAllStartedWhenRootNodeIsNotExisted() {
         when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(false);
         assertFalse(guaranteeService.isAllStarted());
@@ -95,6 +107,18 @@ public final class GuaranteeServiceTest {
     }
     
     @Test
+    public void assertIsNotRegisterCompleteSuccess() {
+        assertFalse(guaranteeService.isRegisterCompleteSuccess(Arrays.asList(0, 1)));
+    }
+    
+    @Test
+    public void assertIsRegisterCompleteSuccess() {
+        when(jobNodeStorage.isJobNodeExisted("guarantee/completed/0")).thenReturn(true);
+        when(jobNodeStorage.isJobNodeExisted("guarantee/completed/1")).thenReturn(true);
+        assertTrue(guaranteeService.isRegisterCompleteSuccess(Arrays.asList(0, 1)));
+    }
+    
+    @Test
     public void assertIsNotAllCompletedWhenRootNodeIsNotExisted() {
         when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(false);
         assertFalse(guaranteeService.isAllCompleted());