You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/21 02:21:55 UTC

[shardingsphere-elasticjob] branch master updated: Only leader can set resharding flag (#1472)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 724c8d4  Only leader can set resharding flag (#1472)
724c8d4 is described below

commit 724c8d4cfcdf8b861baa7d83e8594ce4c30a31e6
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Mon Sep 21 10:21:41 2020 +0800

    Only leader can set resharding flag (#1472)
---
 .../elasticjob/lite/internal/reconcile/ReconcileService.java   |  6 +-----
 .../elasticjob/lite/internal/setup/SetUpFacade.java            |  5 -----
 .../elasticjob/lite/internal/sharding/ShardingService.java     |  3 +++
 .../lite/internal/reconcile/ReconcileServiceTest.java          |  7 -------
 .../elasticjob/lite/internal/setup/SetUpFacadeTest.java        |  6 ------
 .../elasticjob/lite/internal/sharding/ShardingServiceTest.java | 10 +++++++++-
 6 files changed, 13 insertions(+), 24 deletions(-)

diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
index 6069c2f..4d79186 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileService.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.reconcile;
 import com.google.common.util.concurrent.AbstractScheduledService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 
@@ -38,13 +37,10 @@ public final class ReconcileService extends AbstractScheduledService {
     
     private final ShardingService shardingService;
     
-    private final LeaderService leaderService;
-    
     public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
         lastReconcileTime = System.currentTimeMillis();
         configService = new ConfigurationService(regCenter, jobName);
         shardingService = new ShardingService(regCenter, jobName);
-        leaderService = new LeaderService(regCenter, jobName);
     }
     
     @Override
@@ -52,7 +48,7 @@ public final class ReconcileService extends AbstractScheduledService {
         int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
         if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
             lastReconcileTime = System.currentTimeMillis();
-            if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
+            if (!shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
                 log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
                 shardingService.setReshardingFlag();
             }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index 175a3dc..97b5263 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceServi
 import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
 import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService;
 import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
-import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 
 import java.util.List;
@@ -43,8 +42,6 @@ public final class SetUpFacade {
     
     private final InstanceService instanceService;
     
-    private final ShardingService shardingService;
-    
     private final ReconcileService reconcileService;
     
     private final ListenerManager listenerManager;
@@ -54,7 +51,6 @@ public final class SetUpFacade {
         leaderService = new LeaderService(regCenter, jobName);
         serverService = new ServerService(regCenter, jobName);
         instanceService = new InstanceService(regCenter, jobName);
-        shardingService = new ShardingService(regCenter, jobName);
         reconcileService = new ReconcileService(regCenter, jobName);
         listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
     }
@@ -80,7 +76,6 @@ public final class SetUpFacade {
         leaderService.electLeader();
         serverService.persistOnline(enabled);
         instanceService.persistOnline();
-        shardingService.setReshardingFlag();
         if (!reconcileService.isRunning()) {
             reconcileService.startAsync();
         }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index 78bf594..a2c0871 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -80,6 +80,9 @@ public final class ShardingService {
      * Set resharding flag.
      */
     public void setReshardingFlag() {
+        if (!leaderService.isLeaderUntilBlock()) {
+            return;
+        }
         jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
     }
     
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
index 6ff3523..850cf18 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/reconcile/ReconcileServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.reconcile;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
-import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -42,9 +41,6 @@ public final class ReconcileServiceTest {
     @Mock
     private ShardingService shardingService;
     
-    @Mock
-    private LeaderService leaderService;
-    
     private ReconcileService reconcileService;
     
     @Before
@@ -54,7 +50,6 @@ public final class ReconcileServiceTest {
         ReflectionUtils.setFieldValue(reconcileService, "lastReconcileTime", 1L);
         ReflectionUtils.setFieldValue(reconcileService, "configService", configService);
         ReflectionUtils.setFieldValue(reconcileService, "shardingService", shardingService);
-        ReflectionUtils.setFieldValue(reconcileService, "leaderService", leaderService);
     }
     
     @Test
@@ -62,11 +57,9 @@ public final class ReconcileServiceTest {
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").reconcileIntervalMinutes(1).build());
         when(shardingService.isNeedSharding()).thenReturn(false);
         when(shardingService.hasShardingInfoInOfflineServers()).thenReturn(true);
-        when(leaderService.isLeaderUntilBlock()).thenReturn(true);
         reconcileService.runOneIteration();
         verify(shardingService).isNeedSharding();
         verify(shardingService).hasShardingInfoInOfflineServers();
         verify(shardingService).setReshardingFlag();
-        verify(leaderService).isLeaderUntilBlock();
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
index ff1c283..0c27d2f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManag
 import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
-import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,9 +57,6 @@ public final class SetUpFacadeTest {
     private InstanceService instanceService;
     
     @Mock
-    private ShardingService shardingService;
-    
-    @Mock
     private ReconcileService reconcileService;
     
     @Mock
@@ -76,7 +72,6 @@ public final class SetUpFacadeTest {
         ReflectionUtils.setFieldValue(setUpFacade, "leaderService", leaderService);
         ReflectionUtils.setFieldValue(setUpFacade, "serverService", serverService);
         ReflectionUtils.setFieldValue(setUpFacade, "instanceService", instanceService);
-        ReflectionUtils.setFieldValue(setUpFacade, "shardingService", shardingService);
         ReflectionUtils.setFieldValue(setUpFacade, "reconcileService", reconcileService);
         ReflectionUtils.setFieldValue(setUpFacade, "listenerManager", listenerManager);
     }
@@ -96,6 +91,5 @@ public final class SetUpFacadeTest {
         verify(listenerManager).startAllListeners();
         verify(leaderService).electLeader();
         verify(serverService).persistOnline(true);
-        verify(shardingService).setReshardingFlag();
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 9e82db0..eafbb95 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -97,12 +97,20 @@ public final class ShardingServiceTest {
     }
     
     @Test
-    public void assertSetReshardingFlag() {
+    public void assertSetReshardingFlagOnLeader() {
+        when(leaderService.isLeaderUntilBlock()).thenReturn(true);
         shardingService.setReshardingFlag();
         verify(jobNodeStorage).createJobNodeIfNeeded("leader/sharding/necessary");
     }
     
     @Test
+    public void assertSetReshardingFlagOnNonLeader() {
+        when(leaderService.isLeaderUntilBlock()).thenReturn(false);
+        shardingService.setReshardingFlag();
+        verify(jobNodeStorage, times(0)).createJobNodeIfNeeded("leader/sharding/necessary");
+    }
+    
+    @Test
     public void assertIsNeedSharding() {
         when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
         assertTrue(shardingService.isNeedSharding());