You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/04/26 02:58:17 UTC

[shardingsphere-elasticjob] branch master updated: Make sure supporting the case that job instances are crashed while executing failover. (#1873)

This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b36b75  Make sure supporting the case that job instances are crashed while executing failover. (#1873)
4b36b75 is described below

commit 4b36b751cc55d55aa020f317c42cac4680eb1386
Author: sucg <47...@qq.com>
AuthorDate: Mon Apr 26 10:58:06 2021 +0800

    Make sure supporting the case that job instances are crashed while executing failover. (#1873)
    
    Co-authored-by: suchunguan <su...@jd.com>
---
 .../infra/handler/sharding/JobInstance.java        |  2 +-
 .../internal/failover/FailoverListenerManager.java |  6 ++--
 .../lite/internal/failover/FailoverNode.java       |  8 ++++++
 .../lite/internal/failover/FailoverService.java    | 33 +++++++++++++++++++++-
 .../lite/internal/sharding/ShardingService.java    | 21 ++++++++++++++
 .../failover/FailoverListenerManagerTest.java      |  6 ++--
 .../lite/internal/failover/FailoverNodeTest.java   |  5 ++++
 .../internal/failover/FailoverServiceTest.java     | 25 ++++++++++++++++
 .../internal/sharding/ShardingServiceTest.java     | 20 ++++++++++++-
 9 files changed, 117 insertions(+), 9 deletions(-)

diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
index 2b0ffc7..48ef8bb 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
@@ -32,7 +32,7 @@ import java.lang.management.ManagementFactory;
 @EqualsAndHashCode(of = "jobInstanceId")
 public final class JobInstance {
     
-    private static final String DELIMITER = "@-@";
+    public static final String DELIMITER = "@-@";
     
     private String jobInstanceId;
     
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 2eedab3..6f46161 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -76,14 +76,14 @@ public final class FailoverListenerManager extends AbstractListenerManager {
                 if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                     return;
                 }
-                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
+                List<Integer> failoverItems = failoverService.getFailoveringItems(jobInstanceId);
                 if (!failoverItems.isEmpty()) {
                     for (int each : failoverItems) {
-                        failoverService.setCrashedFailoverFlag(each);
+                        failoverService.setCrashedFailoverFlagDirectly(each);
                         failoverService.failoverIfNecessary();
                     }
                 } else {
-                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
+                    for (int each : shardingService.getCrashedShardingItems(jobInstanceId)) {
                         failoverService.setCrashedFailoverFlag(each);
                         failoverService.failoverIfNecessary();
                     }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
index 734faf7..113d16b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
@@ -37,6 +37,10 @@ public final class FailoverNode {
     static final String LATCH = LEADER_ROOT + "/latch";
     
     private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;
+
+    private static final String FAILOVERING = "failovering";
+
+    private static final String EXECUTING_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVERING;
     
     private final JobNodePath jobNodePath;
     
@@ -51,6 +55,10 @@ public final class FailoverNode {
     static String getExecutionFailoverNode(final int item) {
         return String.format(EXECUTION_FAILOVER, item);
     }
+
+    static String getExecutingFailoverNode(final int item) {
+        return String.format(EXECUTING_FAILOVER, item);
+    }
     
     /**
      * Get sharding item by execution failover path.
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
index 5d8a93e..56594af 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
@@ -59,7 +59,16 @@ public final class FailoverService {
             jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
         }
     }
-    
+
+    /**
+     * set crashed failover flag directly.
+     *
+     * @param item crashed item
+     */
+    public void setCrashedFailoverFlagDirectly(final int item) {
+        jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
+    }
+
     private boolean isFailoverAssigned(final Integer item) {
         return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
     }
@@ -86,6 +95,7 @@ public final class FailoverService {
     public void updateFailoverComplete(final Collection<Integer> items) {
         for (int each : items) {
             jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutionFailoverNode(each));
+            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(each));
         }
     }
     
@@ -108,6 +118,26 @@ public final class FailoverService {
         Collections.sort(result);
         return result;
     }
+
+    /**
+     * Get failovering items.
+     *
+     * @param jobInstanceId job instance ID
+     * @return failovering items
+     */
+    public List<Integer> getFailoveringItems(final String jobInstanceId) {
+        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
+        List<Integer> result = new ArrayList<>(items.size());
+        for (String each : items) {
+            int item = Integer.parseInt(each);
+            String node = FailoverNode.getExecutingFailoverNode(item);
+            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
+                result.add(item);
+            }
+        }
+        Collections.sort(result);
+        return result;
+    }
     
     /**
      * Get failover items which execute on localhost.
@@ -156,6 +186,7 @@ public final class FailoverService {
             int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
             log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
             jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+            jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
             jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
             // TODO Instead of using triggerJob, use executor for unified scheduling
             JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index e9e8bb1..9ac967e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -174,6 +174,27 @@ public final class ShardingService {
         }
         return result;
     }
+
+    /**
+     * Get crashed sharding items.
+     *
+     * @param jobInstanceId crashed job instance ID
+     * @return crashed sharding items
+     */
+    public List<Integer> getCrashedShardingItems(final String jobInstanceId) {
+        String serverIp = jobInstanceId.substring(0, jobInstanceId.indexOf(JobInstance.DELIMITER));
+        if (!serverService.isEnableServer(serverIp)) {
+            return Collections.emptyList();
+        }
+        List<Integer> result = new LinkedList<>();
+        int shardingTotalCount = configService.load(true).getShardingTotalCount();
+        for (int i = 0; i < shardingTotalCount; i++) {
+            if (jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
+                result.add(i);
+            }
+        }
+        return result;
+    }
     
     /**
      * Get sharding items from localhost job server.
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index 4d4e93b..8cce3f9 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -117,7 +117,7 @@ public final class FailoverListenerManagerTest {
         JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
         JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
-        when(shardingService.getShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
+        when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
         failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
         verify(failoverService).setCrashedFailoverFlag(0);
         verify(failoverService).setCrashedFailoverFlag(2);
@@ -130,9 +130,9 @@ public final class FailoverListenerManagerTest {
         JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
         JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
-        when(failoverService.getFailoverItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
+        when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
         failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
-        verify(failoverService).setCrashedFailoverFlag(1);
+        verify(failoverService).setCrashedFailoverFlagDirectly(1);
         verify(failoverService).failoverIfNecessary();
         JobRegistry.getInstance().shutdown("test_job");
     }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
index c215c7b..0ef550f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
@@ -46,4 +46,9 @@ public final class FailoverNodeTest {
     public void assertGetItemByExecutionFailoverPath() {
         assertThat(failoverNode.getItemByExecutionFailoverPath("/test_job/sharding/0/failover"), is(0));
     }
+
+    @Test
+    public void assertGetProcessingFailoverNode() {
+        assertThat(FailoverNode.getExecutingFailoverNode(0), is("sharding/0/failovering"));
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
index a5ab300..ce6d512 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
@@ -81,6 +81,12 @@ public final class FailoverServiceTest {
         verify(jobNodeStorage).isJobNodeExisted("sharding/0/failover");
         verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
     }
+
+    @Test
+    public void assertSetCrashedFailoverFlagDirectly() {
+        failoverService.setCrashedFailoverFlag(0);
+        verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
+    }
     
     @Test
     public void assertFailoverIfUnnecessaryWhenItemsRootNodeNotExisted() {
@@ -146,11 +152,30 @@ public final class FailoverServiceTest {
         verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
         verify(jobNodeStorage, times(2)).getJobNodeChildrenKeys("leader/failover/items");
         verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/failover", "127.0.0.1@-@0");
+        verify(jobNodeStorage).fillJobNode("sharding/0/failovering", "127.0.0.1@-@0");
         verify(jobNodeStorage).removeJobNodeIfExisted("leader/failover/items/0");
         verify(jobScheduleController).triggerJob();
         JobRegistry.getInstance().setJobRunning("test_job", false);
         JobRegistry.getInstance().shutdown("test_job");
     }
+
+    @Test
+    public void assertGetFailoveringItems() {
+        JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
+        when(jobNodeStorage.getJobNodeChildrenKeys("sharding")).thenReturn(Arrays.asList("0", "1", "2"));
+        when(jobNodeStorage.isJobNodeExisted("sharding/0/failovering")).thenReturn(true);
+        when(jobNodeStorage.isJobNodeExisted("sharding/1/failovering")).thenReturn(true);
+        when(jobNodeStorage.isJobNodeExisted("sharding/2/failovering")).thenReturn(false);
+        when(jobNodeStorage.getJobNodeDataDirectly("sharding/0/failovering")).thenReturn("127.0.0.1@-@0");
+        when(jobNodeStorage.getJobNodeDataDirectly("sharding/1/failovering")).thenReturn("127.0.0.1@-@1");
+        assertThat(failoverService.getFailoveringItems("127.0.0.1@-@1"), is(Collections.singletonList(1)));
+        verify(jobNodeStorage).getJobNodeChildrenKeys("sharding");
+        verify(jobNodeStorage).isJobNodeExisted("sharding/0/failovering");
+        verify(jobNodeStorage).isJobNodeExisted("sharding/1/failovering");
+        verify(jobNodeStorage).getJobNodeDataDirectly("sharding/0/failovering");
+        verify(jobNodeStorage).getJobNodeDataDirectly("sharding/1/failovering");
+        JobRegistry.getInstance().shutdown("test_job");
+    }
     
     @Test
     public void assertUpdateFailoverComplete() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 5f5eee6..12a433b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -186,7 +186,7 @@ public final class ShardingServiceTest {
     }
     
     @Test
-    public void assertGetShardingItemsWithEnabledServer() {
+    public void assertGetShardingItemsWithAvailableServer() {
         JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
         JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
         when(serverService.isAvailableServer("127.0.0.1")).thenReturn(true);
@@ -274,4 +274,22 @@ public final class ShardingServiceTest {
         verify(transactionOp, times(3)).create();
         verify(transactionOp, times(2)).delete();
     }
+
+    @Test
+    public void assertGetCrashedShardingItemsWithNotEnableServer() {
+        assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Collections.<Integer>emptyList()));
+    }
+
+    @Test
+    public void assertGetCrashedShardingItemsWithEnabledServer() {
+        JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
+        JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
+        when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").build());
+        when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
+        when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
+        when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+        assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Arrays.asList(0, 2)));
+        JobRegistry.getInstance().shutdown("test_job");
+    }
 }