You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/04/26 02:58:17 UTC
[shardingsphere-elasticjob] branch master updated: Make sure
supporting the case that job instances are crashed while executing
failover. (#1873)
This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 4b36b75 Make sure supporting the case that job instances are crashed while executing failover. (#1873)
4b36b75 is described below
commit 4b36b751cc55d55aa020f317c42cac4680eb1386
Author: sucg <47...@qq.com>
AuthorDate: Mon Apr 26 10:58:06 2021 +0800
Make sure supporting the case that job instances are crashed while executing failover. (#1873)
Co-authored-by: suchunguan <su...@jd.com>
---
.../infra/handler/sharding/JobInstance.java | 2 +-
.../internal/failover/FailoverListenerManager.java | 6 ++--
.../lite/internal/failover/FailoverNode.java | 8 ++++++
.../lite/internal/failover/FailoverService.java | 33 +++++++++++++++++++++-
.../lite/internal/sharding/ShardingService.java | 21 ++++++++++++++
.../failover/FailoverListenerManagerTest.java | 6 ++--
.../lite/internal/failover/FailoverNodeTest.java | 5 ++++
.../internal/failover/FailoverServiceTest.java | 25 ++++++++++++++++
.../internal/sharding/ShardingServiceTest.java | 20 ++++++++++++-
9 files changed, 117 insertions(+), 9 deletions(-)
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
index 2b0ffc7..48ef8bb 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
@@ -32,7 +32,7 @@ import java.lang.management.ManagementFactory;
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {
- private static final String DELIMITER = "@-@";
+ public static final String DELIMITER = "@-@";
private String jobInstanceId;
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 2eedab3..6f46161 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -76,14 +76,14 @@ public final class FailoverListenerManager extends AbstractListenerManager {
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
- List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
+ List<Integer> failoverItems = failoverService.getFailoveringItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
- failoverService.setCrashedFailoverFlag(each);
+ failoverService.setCrashedFailoverFlagDirectly(each);
failoverService.failoverIfNecessary();
}
} else {
- for (int each : shardingService.getShardingItems(jobInstanceId)) {
+ for (int each : shardingService.getCrashedShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
index 734faf7..113d16b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNode.java
@@ -37,6 +37,10 @@ public final class FailoverNode {
static final String LATCH = LEADER_ROOT + "/latch";
private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;
+
+ private static final String FAILOVERING = "failovering";
+
+ private static final String EXECUTING_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVERING;
private final JobNodePath jobNodePath;
@@ -51,6 +55,10 @@ public final class FailoverNode {
static String getExecutionFailoverNode(final int item) {
return String.format(EXECUTION_FAILOVER, item);
}
+
+ static String getExecutingFailoverNode(final int item) {
+ return String.format(EXECUTING_FAILOVER, item);
+ }
/**
* Get sharding item by execution failover path.
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
index 5d8a93e..56594af 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
@@ -59,7 +59,16 @@ public final class FailoverService {
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
}
}
-
+
+ /**
+ * set crashed failover flag directly.
+ *
+ * @param item crashed item
+ */
+ public void setCrashedFailoverFlagDirectly(final int item) {
+ jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
+ }
+
private boolean isFailoverAssigned(final Integer item) {
return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
}
@@ -86,6 +95,7 @@ public final class FailoverService {
public void updateFailoverComplete(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutionFailoverNode(each));
+ jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(each));
}
}
@@ -108,6 +118,26 @@ public final class FailoverService {
Collections.sort(result);
return result;
}
+
+ /**
+ * Get failovering items.
+ *
+ * @param jobInstanceId job instance ID
+ * @return failovering items
+ */
+ public List<Integer> getFailoveringItems(final String jobInstanceId) {
+ List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
+ List<Integer> result = new ArrayList<>(items.size());
+ for (String each : items) {
+ int item = Integer.parseInt(each);
+ String node = FailoverNode.getExecutingFailoverNode(item);
+ if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
+ result.add(item);
+ }
+ }
+ Collections.sort(result);
+ return result;
+ }
/**
* Get failover items which execute on localhost.
@@ -156,6 +186,7 @@ public final class FailoverService {
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+ jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO Instead of using triggerJob, use executor for unified scheduling
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index e9e8bb1..9ac967e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -174,6 +174,27 @@ public final class ShardingService {
}
return result;
}
+
+ /**
+ * Get crashed sharding items.
+ *
+ * @param jobInstanceId crashed job instance ID
+ * @return crashed sharding items
+ */
+ public List<Integer> getCrashedShardingItems(final String jobInstanceId) {
+ String serverIp = jobInstanceId.substring(0, jobInstanceId.indexOf(JobInstance.DELIMITER));
+ if (!serverService.isEnableServer(serverIp)) {
+ return Collections.emptyList();
+ }
+ List<Integer> result = new LinkedList<>();
+ int shardingTotalCount = configService.load(true).getShardingTotalCount();
+ for (int i = 0; i < shardingTotalCount; i++) {
+ if (jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
+ result.add(i);
+ }
+ }
+ return result;
+ }
/**
* Get sharding items from localhost job server.
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index 4d4e93b..8cce3f9 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -117,7 +117,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
- when(shardingService.getShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
+ when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
verify(failoverService).setCrashedFailoverFlag(0);
verify(failoverService).setCrashedFailoverFlag(2);
@@ -130,9 +130,9 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
- when(failoverService.getFailoverItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
+ when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
- verify(failoverService).setCrashedFailoverFlag(1);
+ verify(failoverService).setCrashedFailoverFlagDirectly(1);
verify(failoverService).failoverIfNecessary();
JobRegistry.getInstance().shutdown("test_job");
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
index c215c7b..0ef550f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverNodeTest.java
@@ -46,4 +46,9 @@ public final class FailoverNodeTest {
public void assertGetItemByExecutionFailoverPath() {
assertThat(failoverNode.getItemByExecutionFailoverPath("/test_job/sharding/0/failover"), is(0));
}
+
+ @Test
+ public void assertGetProcessingFailoverNode() {
+ assertThat(FailoverNode.getExecutingFailoverNode(0), is("sharding/0/failovering"));
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
index a5ab300..ce6d512 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
@@ -81,6 +81,12 @@ public final class FailoverServiceTest {
verify(jobNodeStorage).isJobNodeExisted("sharding/0/failover");
verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
}
+
+ @Test
+ public void assertSetCrashedFailoverFlagDirectly() {
+ failoverService.setCrashedFailoverFlag(0);
+ verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
+ }
@Test
public void assertFailoverIfUnnecessaryWhenItemsRootNodeNotExisted() {
@@ -146,11 +152,30 @@ public final class FailoverServiceTest {
verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
verify(jobNodeStorage, times(2)).getJobNodeChildrenKeys("leader/failover/items");
verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/failover", "127.0.0.1@-@0");
+ verify(jobNodeStorage).fillJobNode("sharding/0/failovering", "127.0.0.1@-@0");
verify(jobNodeStorage).removeJobNodeIfExisted("leader/failover/items/0");
verify(jobScheduleController).triggerJob();
JobRegistry.getInstance().setJobRunning("test_job", false);
JobRegistry.getInstance().shutdown("test_job");
}
+
+ @Test
+ public void assertGetFailoveringItems() {
+ JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
+ when(jobNodeStorage.getJobNodeChildrenKeys("sharding")).thenReturn(Arrays.asList("0", "1", "2"));
+ when(jobNodeStorage.isJobNodeExisted("sharding/0/failovering")).thenReturn(true);
+ when(jobNodeStorage.isJobNodeExisted("sharding/1/failovering")).thenReturn(true);
+ when(jobNodeStorage.isJobNodeExisted("sharding/2/failovering")).thenReturn(false);
+ when(jobNodeStorage.getJobNodeDataDirectly("sharding/0/failovering")).thenReturn("127.0.0.1@-@0");
+ when(jobNodeStorage.getJobNodeDataDirectly("sharding/1/failovering")).thenReturn("127.0.0.1@-@1");
+ assertThat(failoverService.getFailoveringItems("127.0.0.1@-@1"), is(Collections.singletonList(1)));
+ verify(jobNodeStorage).getJobNodeChildrenKeys("sharding");
+ verify(jobNodeStorage).isJobNodeExisted("sharding/0/failovering");
+ verify(jobNodeStorage).isJobNodeExisted("sharding/1/failovering");
+ verify(jobNodeStorage).getJobNodeDataDirectly("sharding/0/failovering");
+ verify(jobNodeStorage).getJobNodeDataDirectly("sharding/1/failovering");
+ JobRegistry.getInstance().shutdown("test_job");
+ }
@Test
public void assertUpdateFailoverComplete() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 5f5eee6..12a433b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -186,7 +186,7 @@ public final class ShardingServiceTest {
}
@Test
- public void assertGetShardingItemsWithEnabledServer() {
+ public void assertGetShardingItemsWithAvailableServer() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(serverService.isAvailableServer("127.0.0.1")).thenReturn(true);
@@ -274,4 +274,22 @@ public final class ShardingServiceTest {
verify(transactionOp, times(3)).create();
verify(transactionOp, times(2)).delete();
}
+
+ @Test
+ public void assertGetCrashedShardingItemsWithNotEnableServer() {
+ assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Collections.<Integer>emptyList()));
+ }
+
+ @Test
+ public void assertGetCrashedShardingItemsWithEnabledServer() {
+ JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
+ JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
+ when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").build());
+ when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
+ when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
+ when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+ assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Arrays.asList(0, 2)));
+ JobRegistry.getInstance().shutdown("test_job");
+ }
}