You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/21 02:21:55 UTC
[shardingsphere-elasticjob] branch master updated: Only leader can
set resharding flag (#1472)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 724c8d4 Only leader can set resharding flag (#1472)
724c8d4 is described below
commit 724c8d4cfcdf8b861baa7d83e8594ce4c30a31e6
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Mon Sep 21 10:21:41 2020 +0800
Only leader can set resharding flag (#1472)
---
.../elasticjob/lite/internal/reconcile/ReconcileService.java | 6 +-----
.../elasticjob/lite/internal/setup/SetUpFacade.java | 5 -----
.../elasticjob/lite/internal/sharding/ShardingService.java | 3 +++
.../lite/internal/reconcile/ReconcileServiceTest.java | 7 -------
.../elasticjob/lite/internal/setup/SetUpFacadeTest.java | 6 ------
.../elasticjob/lite/internal/sharding/ShardingServiceTest.java | 10 +++++++++-
6 files changed, 13 insertions(+), 24 deletions(-)
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
index 6069c2f..4d79186 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.reconcile;
import com.google.common.util.concurrent.AbstractScheduledService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -38,13 +37,10 @@ public final class ReconcileService extends AbstractScheduledService {
private final ShardingService shardingService;
- private final LeaderService leaderService;
-
public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
lastReconcileTime = System.currentTimeMillis();
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
- leaderService = new LeaderService(regCenter, jobName);
}
@Override
@@ -52,7 +48,7 @@ public final class ReconcileService extends AbstractScheduledService {
int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
- if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
+ if (!shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index 175a3dc..97b5263 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceServi
import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
-import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import java.util.List;
@@ -43,8 +42,6 @@ public final class SetUpFacade {
private final InstanceService instanceService;
- private final ShardingService shardingService;
-
private final ReconcileService reconcileService;
private final ListenerManager listenerManager;
@@ -54,7 +51,6 @@ public final class SetUpFacade {
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
- shardingService = new ShardingService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
@@ -80,7 +76,6 @@ public final class SetUpFacade {
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
- shardingService.setReshardingFlag();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index 78bf594..a2c0871 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -80,6 +80,9 @@ public final class ShardingService {
* Set resharding flag.
*/
public void setReshardingFlag() {
+ if (!leaderService.isLeaderUntilBlock()) {
+ return;
+ }
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
index 6ff3523..850cf18 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.reconcile;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -42,9 +41,6 @@ public final class ReconcileServiceTest {
@Mock
private ShardingService shardingService;
- @Mock
- private LeaderService leaderService;
-
private ReconcileService reconcileService;
@Before
@@ -54,7 +50,6 @@ public final class ReconcileServiceTest {
ReflectionUtils.setFieldValue(reconcileService, "lastReconcileTime", 1L);
ReflectionUtils.setFieldValue(reconcileService, "configService", configService);
ReflectionUtils.setFieldValue(reconcileService, "shardingService", shardingService);
- ReflectionUtils.setFieldValue(reconcileService, "leaderService", leaderService);
}
@Test
@@ -62,11 +57,9 @@ public final class ReconcileServiceTest {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").reconcileIntervalMinutes(1).build());
when(shardingService.isNeedSharding()).thenReturn(false);
when(shardingService.hasShardingInfoInOfflineServers()).thenReturn(true);
- when(leaderService.isLeaderUntilBlock()).thenReturn(true);
reconcileService.runOneIteration();
verify(shardingService).isNeedSharding();
verify(shardingService).hasShardingInfoInOfflineServers();
verify(shardingService).setReshardingFlag();
- verify(leaderService).isLeaderUntilBlock();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
index ff1c283..0c27d2f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManag
import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
-import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
import org.junit.Before;
import org.junit.Test;
@@ -58,9 +57,6 @@ public final class SetUpFacadeTest {
private InstanceService instanceService;
@Mock
- private ShardingService shardingService;
-
- @Mock
private ReconcileService reconcileService;
@Mock
@@ -76,7 +72,6 @@ public final class SetUpFacadeTest {
ReflectionUtils.setFieldValue(setUpFacade, "leaderService", leaderService);
ReflectionUtils.setFieldValue(setUpFacade, "serverService", serverService);
ReflectionUtils.setFieldValue(setUpFacade, "instanceService", instanceService);
- ReflectionUtils.setFieldValue(setUpFacade, "shardingService", shardingService);
ReflectionUtils.setFieldValue(setUpFacade, "reconcileService", reconcileService);
ReflectionUtils.setFieldValue(setUpFacade, "listenerManager", listenerManager);
}
@@ -96,6 +91,5 @@ public final class SetUpFacadeTest {
verify(listenerManager).startAllListeners();
verify(leaderService).electLeader();
verify(serverService).persistOnline(true);
- verify(shardingService).setReshardingFlag();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 9e82db0..eafbb95 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -97,12 +97,20 @@ public final class ShardingServiceTest {
}
@Test
- public void assertSetReshardingFlag() {
+ public void assertSetReshardingFlagOnLeader() {
+ when(leaderService.isLeaderUntilBlock()).thenReturn(true);
shardingService.setReshardingFlag();
verify(jobNodeStorage).createJobNodeIfNeeded("leader/sharding/necessary");
}
@Test
+ public void assertSetReshardingFlagOnNonLeader() {
+ when(leaderService.isLeaderUntilBlock()).thenReturn(false);
+ shardingService.setReshardingFlag();
+ verify(jobNodeStorage, times(0)).createJobNodeIfNeeded("leader/sharding/necessary");
+ }
+
+ @Test
public void assertIsNeedSharding() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
assertTrue(shardingService.isNeedSharding());