You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/06 04:22:26 UTC
[shardingsphere-elasticjob-lite] branch master updated: Fix the job
can not enable for disabled job integrate test sometimes #953 (#959)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new d410d4f Fix the job can not enable for disabled job integrate test sometimes #953 (#959)
d410d4f is described below
commit d410d4f5c2da1fec796344a1c2bd2363b6a34b47
Author: keker <as...@163.com>
AuthorDate: Mon Jul 6 12:22:19 2020 +0800
Fix the job can not enable for disabled job integrate test sometimes #953 (#959)
---
.../AbstractDistributeOnceElasticJobListener.java | 14 +++++++--
.../lite/internal/guarantee/GuaranteeService.java | 34 ++++++++++++++++++++--
.../DistributeOnceElasticJobListenerTest.java | 6 ++++
.../disable/ScheduleDisabledJobIntegrateTest.java | 11 ++++---
.../internal/guarantee/GuaranteeServiceTest.java | 24 +++++++++++++++
5 files changed, 79 insertions(+), 10 deletions(-)
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
index 542047d..3c80b98 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.elasticjob.lite.api.listener;
+import java.util.Set;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.lite.executor.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
+import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.lite.util.env.TimeService;
/**
@@ -56,7 +58,11 @@ public abstract class AbstractDistributeOnceElasticJobListener implements Elasti
@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
- guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
+ Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
+ guaranteeService.registerStart(shardingItems);
+ while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
+ BlockUtils.waitingShortTime();
+ }
if (guaranteeService.isAllStarted()) {
doBeforeJobExecutedAtLastStarted(shardingContexts);
guaranteeService.clearAllStartedInfo();
@@ -78,7 +84,11 @@ public abstract class AbstractDistributeOnceElasticJobListener implements Elasti
@Override
public final void afterJobExecuted(final ShardingContexts shardingContexts) {
- guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());
+ Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
+ guaranteeService.registerComplete(shardingItems);
+ while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) {
+ BlockUtils.waitingShortTime();
+ }
if (guaranteeService.isAllCompleted()) {
doAfterJobExecutedAtLastCompleted(shardingContexts);
guaranteeService.clearAllCompletedInfo();
diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
index e6db406..d8f0da3 100644
--- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
+++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeService.java
@@ -39,8 +39,8 @@ public final class GuaranteeService {
/**
* Register start.
- *
- * @param shardingItems to be registered sharding items
+ *
+ * @param shardingItems to be registered sharding items
*/
public void registerStart(final Collection<Integer> shardingItems) {
for (int each : shardingItems) {
@@ -49,6 +49,21 @@ public final class GuaranteeService {
}
/**
+ * Judge whether current sharding items are all register start success.
+ *
+ * @param shardingItems current sharding items
+ * @return current sharding items are all start success or not
+ */
+ public boolean isRegisterStartSuccess(final Collection<Integer> shardingItems) {
+ for (int each : shardingItems) {
+ if (!jobNodeStorage.isJobNodeExisted(GuaranteeNode.getStartedNode(each))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Judge whether job's sharding items are all started.
*
* @return job's sharding items are all started or not
@@ -77,6 +92,21 @@ public final class GuaranteeService {
}
/**
+ * Judge whether sharding items are register complete success.
+ *
+ * @param shardingItems current sharding items
+ * @return current sharding items are all complete success or not
+ */
+ public boolean isRegisterCompleteSuccess(final Collection<Integer> shardingItems) {
+ for (int each : shardingItems) {
+ if (!jobNodeStorage.isJobNodeExisted(GuaranteeNode.getCompletedNode(each))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Judge whether job's sharding items are all completed.
*
* @return job's sharding items are all completed or not
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
index e85137b..1465643 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java
@@ -68,6 +68,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test
public void assertBeforeJobExecutedWhenIsAllStarted() {
+ when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllStarted()).thenReturn(true);
distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
verify(guaranteeService).registerStart(Sets.newHashSet(0, 1));
@@ -77,6 +78,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test
public void assertBeforeJobExecutedWhenIsNotAllStartedAndNotTimeout() {
+ when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllStarted()).thenReturn(false);
when(timeService.getCurrentMillis()).thenReturn(0L);
distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
@@ -86,6 +88,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test(expected = JobSystemException.class)
public void assertBeforeJobExecutedWhenIsNotAllStartedAndTimeout() {
+ when(guaranteeService.isRegisterStartSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllStarted()).thenReturn(false);
when(timeService.getCurrentMillis()).thenReturn(0L, 2L);
distributeOnceElasticJobListener.beforeJobExecuted(shardingContexts);
@@ -95,6 +98,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test
public void assertAfterJobExecutedWhenIsAllCompleted() {
+ when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllCompleted()).thenReturn(true);
distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
verify(guaranteeService).registerComplete(Sets.newHashSet(0, 1));
@@ -104,6 +108,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test
public void assertAfterJobExecutedWhenIsAllCompletedAndNotTimeout() {
+ when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllCompleted()).thenReturn(false);
when(timeService.getCurrentMillis()).thenReturn(0L);
distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
@@ -113,6 +118,7 @@ public final class DistributeOnceElasticJobListenerTest {
@Test(expected = JobSystemException.class)
public void assertAfterJobExecutedWhenIsAllCompletedAndTimeout() {
+ when(guaranteeService.isRegisterCompleteSuccess(Sets.newHashSet(0, 1))).thenReturn(true);
when(guaranteeService.isAllCompleted()).thenReturn(false);
when(timeService.getCurrentMillis()).thenReturn(0L, 2L);
distributeOnceElasticJobListener.afterJobExecuted(shardingContexts);
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
index df599be..d7ab21e 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.integrate.disable;
import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.fixture.job.DetailedFooJob;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerStatus;
import org.apache.shardingsphere.elasticjob.lite.util.concurrent.BlockUtils;
@@ -41,10 +42,9 @@ public final class ScheduleDisabledJobIntegrateTest extends DisabledJobIntegrate
BlockUtils.waitingShortTime();
assertDisabledRegCenterInfo();
setJobEnable();
- // TODO the job can not enable sometimes
-// while (!((DetailedFooJob) getElasticJob()).isCompleted()) {
-// BlockUtils.waitingShortTime();
-// }
+ while (!((DetailedFooJob) getElasticJob()).isCompleted()) {
+ BlockUtils.waitingShortTime();
+ }
assertEnabledRegCenterInfo();
}
@@ -55,7 +55,6 @@ public final class ScheduleDisabledJobIntegrateTest extends DisabledJobIntegrate
private void assertEnabledRegCenterInfo() {
assertTrue(getRegCenter().isExisted("/" + getJobName() + "/instances/" + JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
getRegCenter().remove("/" + getJobName() + "/leader/election");
- // TODO the job can not enable sometimes, so can not assert if job not schedule
-// assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
+ assertTrue(getRegCenter().isExisted("/" + getJobName() + "/sharding"));
}
}
diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
index fdfb4ad..f82b89f 100644
--- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
+++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeServiceTest.java
@@ -59,6 +59,18 @@ public final class GuaranteeServiceTest {
}
@Test
+ public void assertIsNotRegisterStartSuccess() {
+ assertFalse(guaranteeService.isRegisterStartSuccess(Arrays.asList(0, 1)));
+ }
+
+ @Test
+ public void assertIsRegisterStartSuccess() {
+ when(jobNodeStorage.isJobNodeExisted("guarantee/started/0")).thenReturn(true);
+ when(jobNodeStorage.isJobNodeExisted("guarantee/started/1")).thenReturn(true);
+ assertTrue(guaranteeService.isRegisterStartSuccess(Arrays.asList(0, 1)));
+ }
+
+ @Test
public void assertIsNotAllStartedWhenRootNodeIsNotExisted() {
when(jobNodeStorage.isJobNodeExisted("guarantee/started")).thenReturn(false);
assertFalse(guaranteeService.isAllStarted());
@@ -95,6 +107,18 @@ public final class GuaranteeServiceTest {
}
@Test
+ public void assertIsNotRegisterCompleteSuccess() {
+ assertFalse(guaranteeService.isRegisterCompleteSuccess(Arrays.asList(0, 1)));
+ }
+
+ @Test
+ public void assertIsRegisterCompleteSuccess() {
+ when(jobNodeStorage.isJobNodeExisted("guarantee/completed/0")).thenReturn(true);
+ when(jobNodeStorage.isJobNodeExisted("guarantee/completed/1")).thenReturn(true);
+ assertTrue(guaranteeService.isRegisterCompleteSuccess(Arrays.asList(0, 1)));
+ }
+
+ @Test
public void assertIsNotAllCompletedWhenRootNodeIsNotExisted() {
when(jobNodeStorage.isJobNodeExisted("guarantee/completed")).thenReturn(false);
assertFalse(guaranteeService.isAllCompleted());