You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by qi...@apache.org on 2021/03/30 08:18:00 UTC
[shardingsphere-elasticjob] branch master updated: optimize
JobInstance
This is an automated email from the ASF dual-hosted git repository.
qiulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new e5ffb05 optimize JobInstance
new 7ca9ce8 Merge pull request #1861 from Lucas-307/0329.1646
e5ffb05 is described below
commit e5ffb05e6917cde7f5a0ab379513cdb5c0f9736c
Author: qiulu3 <Lucas209910>
AuthorDate: Tue Mar 30 16:15:21 2021 +0800
optimize JobInstance
---
.../infra/handler/sharding/JobInstance.java | 33 ++++++++++++++--------
.../infra/handler/sharding/JobInstanceTest.java | 11 +++++++-
.../internal/election/ElectionListenerManager.java | 2 +-
.../lite/internal/election/LeaderService.java | 2 +-
.../lite/internal/instance/InstanceNode.java | 12 ++++++--
.../lite/internal/instance/InstanceService.java | 7 +++--
.../RegistryCenterConnectionStateListener.java | 2 +-
.../lite/internal/server/ServerNode.java | 2 +-
.../lite/internal/server/ServerService.java | 2 +-
.../lite/internal/sharding/ShardingService.java | 10 +++++--
.../disable/DisabledJobIntegrateTest.java | 4 +--
.../disable/ScheduleDisabledJobIntegrateTest.java | 2 +-
.../integrate/enable/EnabledJobIntegrateTest.java | 4 +--
.../election/ElectionListenerManagerTest.java | 4 +--
.../lite/internal/election/LeaderServiceTest.java | 4 +--
.../internal/instance/InstanceServiceTest.java | 10 ++++---
.../RegistryCenterConnectionStateListenerTest.java | 2 +-
.../lite/internal/server/ServerNodeTest.java | 2 +-
.../lite/internal/server/ServerServiceTest.java | 2 +-
.../internal/sharding/ShardingServiceTest.java | 7 +++--
.../internal/operate/JobOperateAPIImpl.java | 8 ++++--
.../internal/statistics/JobStatisticsAPIImpl.java | 4 ++-
.../statistics/ServerStatisticsAPIImpl.java | 6 ++--
.../statistics/ShardingStatisticsAPIImpl.java | 8 ++++--
.../internal/operate/JobOperateAPIImplTest.java | 3 ++
.../statistics/JobStatisticsAPIImplTest.java | 1 +
.../statistics/ServerStatisticsAPIImplTest.java | 3 ++
.../statistics/ShardingStatisticsAPIImplTest.java | 12 +++++---
28 files changed, 112 insertions(+), 57 deletions(-)
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
index 904c498..2b0ffc7 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstance.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import java.lang.management.ManagementFactory;
@@ -27,25 +27,34 @@ import java.lang.management.ManagementFactory;
/**
* Job instance.
*/
-@RequiredArgsConstructor
@Getter
+@Setter
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {
private static final String DELIMITER = "@-@";
- private final String jobInstanceId;
+ private String jobInstanceId;
+
+ private String labels;
+
+ private String serverIp;
public JobInstance() {
- jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
+ this(IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
+ }
+
+ public JobInstance(final String jobInstanceId) {
+ this(jobInstanceId, null);
+ }
+
+ public JobInstance(final String jobInstanceId, final String labels) {
+ this(jobInstanceId, labels, IpUtils.getIp());
}
- /**
- * Get server IP address.
- *
- * @return server IP address
- */
- public String getIp() {
- return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));
+ public JobInstance(final String jobInstanceId, final String labels, final String serverIp) {
+ this.jobInstanceId = jobInstanceId;
+ this.labels = labels;
+ this.serverIp = serverIp;
}
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstanceTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstanceTest.java
index 12d54d0..8c22378 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstanceTest.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/handler/sharding/JobInstanceTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -32,6 +33,14 @@ public final class JobInstanceTest {
@Test
public void assertGetIp() {
- assertThat(new JobInstance().getIp(), is(IpUtils.getIp()));
+ assertThat(new JobInstance().getServerIp(), is(IpUtils.getIp()));
+ }
+
+ @Test
+ public void assertYamlConvert() {
+ JobInstance actual = YamlEngine.unmarshal(YamlEngine.marshal(new JobInstance("id", "labels")), JobInstance.class);
+ assertThat(actual.getJobInstanceId(), is("id"));
+ assertThat(actual.getServerIp(), is(IpUtils.getIp()));
+ assertThat(actual.getLabels(), is("labels"));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
index 4f4a02b..f06b5bf 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
@@ -73,7 +73,7 @@ public final class ElectionListenerManager extends AbstractListenerManager {
private boolean isPassiveElection(final String path, final Type eventType) {
JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
- return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
+ return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getServerIp());
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderService.java
index 025a16c..49bd262 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderService.java
@@ -66,7 +66,7 @@ public final class LeaderService {
while (!hasLeader() && serverService.hasAvailableServers()) {
log.info("Leader is electing, waiting for {} ms", 100);
BlockUtils.waitingShortTime();
- if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
+ if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp())) {
electLeader();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
index b1929fe..a873a7b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceNode.java
@@ -77,10 +77,16 @@ public final class InstanceNode {
* @return local instance value
*/
public String getLocalInstanceValue() {
- return YamlEngine.marshal(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
+ return YamlEngine.marshal(JobRegistry.getInstance().getJobInstance(jobName));
}
-
- String getInstancePath(final String instanceId) {
+
+ /**
+ * Get instance path.
+ *
+ * @param instanceId instance id
+ * @return instance path
+ */
+ public String getInstancePath(final String instanceId) {
return String.format(INSTANCES, instanceId);
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
index 0cb6375..6d316d9 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceService.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.instance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.internal.trigger.TriggerNode;
@@ -50,7 +51,7 @@ public final class InstanceService {
* Persist job online status.
*/
public void persistOnline() {
- jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), "");
+ jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), instanceNode.getLocalInstanceValue());
}
/**
@@ -68,8 +69,8 @@ public final class InstanceService {
public List<JobInstance> getAvailableJobInstances() {
List<JobInstance> result = new LinkedList<>();
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
- JobInstance jobInstance = new JobInstance(each);
- if (serverService.isEnableServer(jobInstance.getIp())) {
+ JobInstance jobInstance = YamlEngine.unmarshal(jobNodeStorage.getJobNodeData(instanceNode.getInstancePath(each)), JobInstance.class);
+ if (serverService.isEnableServer(jobInstance.getServerIp())) {
result.add(new JobInstance(each));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListener.java
index 2f4e284..b939faa 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListener.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListener.java
@@ -60,7 +60,7 @@ public final class RegistryCenterConnectionStateListener implements ConnectionSt
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) {
- serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
+ serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()));
instanceService.persistOnline();
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
jobScheduleController.resumeJob();
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNode.java
index 5cc8058..683e0b8 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNode.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNode.java
@@ -64,7 +64,7 @@ public final class ServerNode {
if (Objects.isNull(jobInstance)) {
return false;
}
- return path.equals(jobNodePath.getFullPath(String.format(SERVERS, jobInstance.getIp())));
+ return path.equals(jobNodePath.getFullPath(String.format(SERVERS, jobInstance.getServerIp())));
}
String getServerNode(final String ip) {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
index 23830c3..1b785da 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java
@@ -50,7 +50,7 @@ public final class ServerService {
*/
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) {
- jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
+ jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index a2c0871..e9e8bb1 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategyFactory;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
@@ -59,6 +60,8 @@ public final class ShardingService {
private final InstanceService instanceService;
+ private final InstanceNode instanceNode;
+
private final ServerService serverService;
private final ExecutionService executionService;
@@ -71,6 +74,7 @@ public final class ShardingService {
leaderService = new LeaderService(regCenter, jobName);
configService = new ConfigurationService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
+ instanceNode = new InstanceNode(jobName);
serverService = new ServerService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
jobNodePath = new JobNodePath(jobName);
@@ -157,8 +161,8 @@ public final class ShardingService {
* @return sharding items
*/
public List<Integer> getShardingItems(final String jobInstanceId) {
- JobInstance jobInstance = new JobInstance(jobInstanceId);
- if (!serverService.isAvailableServer(jobInstance.getIp())) {
+ JobInstance jobInstance = YamlEngine.unmarshal(jobNodeStorage.getJobNodeData(instanceNode.getInstancePath(jobInstanceId)), JobInstance.class);
+ if (!serverService.isAvailableServer(jobInstance.getServerIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
@@ -177,7 +181,7 @@ public final class ShardingService {
* @return sharding items from localhost job server
*/
public List<Integer> getLocalShardingItems() {
- if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
+ if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp())) {
return Collections.emptyList();
}
return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/DisabledJobIntegrateTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/DisabledJobIntegrateTest.java
index c1a2a35..7124db0 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/DisabledJobIntegrateTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/DisabledJobIntegrateTest.java
@@ -40,7 +40,7 @@ public abstract class DisabledJobIntegrateTest extends BaseIntegrateTest {
protected final void assertDisabledRegCenterInfo() {
assertThat(JobRegistry.getInstance().getCurrentShardingTotalCount(getJobName()), is(3));
- assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), is(IpUtils.getIp()));
+ assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), is(IpUtils.getIp()));
JobConfiguration jobConfig = YamlEngine.unmarshal(getREGISTRY_CENTER().get("/" + getJobName() + "/config"), JobConfigurationPOJO.class).toJobConfiguration();
assertThat(jobConfig.getShardingTotalCount(), is(3));
if (getJobBootstrap() instanceof ScheduleJobBootstrap) {
@@ -49,7 +49,7 @@ public abstract class DisabledJobIntegrateTest extends BaseIntegrateTest {
assertNull(jobConfig.getCron());
}
assertThat(jobConfig.getShardingItemParameters(), is("0=A,1=B,2=C"));
- assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp()), is(ServerStatus.DISABLED.name()));
+ assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp()), is(ServerStatus.DISABLED.name()));
while (null != getREGISTRY_CENTER().get("/" + getJobName() + "/leader/election/instance")) {
BlockUtils.waitingShortTime();
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
index 36effec..bebc278 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/disable/ScheduleDisabledJobIntegrateTest.java
@@ -50,7 +50,7 @@ public final class ScheduleDisabledJobIntegrateTest extends DisabledJobIntegrate
}
private void setJobEnable() {
- getREGISTRY_CENTER().persist("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), ServerStatus.ENABLED.name());
+ getREGISTRY_CENTER().persist("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), ServerStatus.ENABLED.name());
}
private void assertEnabledRegCenterInfo() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/enable/EnabledJobIntegrateTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/enable/EnabledJobIntegrateTest.java
index 4766fe7..2c39eaf 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/enable/EnabledJobIntegrateTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/enable/EnabledJobIntegrateTest.java
@@ -42,7 +42,7 @@ public abstract class EnabledJobIntegrateTest extends BaseIntegrateTest {
@Before
public final void assertEnabledRegCenterInfo() {
assertThat(JobRegistry.getInstance().getCurrentShardingTotalCount(getJobName()), is(3));
- assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), is(IpUtils.getIp()));
+ assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), is(IpUtils.getIp()));
JobConfiguration jobConfig = YamlEngine.unmarshal(getREGISTRY_CENTER().get("/" + getJobName() + "/config"), JobConfigurationPOJO.class).toJobConfiguration();
assertThat(jobConfig.getShardingTotalCount(), is(3));
if (getJobBootstrap() instanceof ScheduleJobBootstrap) {
@@ -51,7 +51,7 @@ public abstract class EnabledJobIntegrateTest extends BaseIntegrateTest {
assertNull(jobConfig.getCron());
}
assertThat(jobConfig.getShardingItemParameters(), is("0=A,1=B,2=C"));
- assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp()), is(ServerStatus.ENABLED.name()));
+ assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp()), is(ServerStatus.ENABLED.name()));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/leader/election/instance"), is(JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
assertTrue(getREGISTRY_CENTER().isExisted("/" + getJobName() + "/instances/" + JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
getREGISTRY_CENTER().remove("/" + getJobName() + "/leader/election");
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
index d9b59d5..93995db 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
@@ -24,8 +24,8 @@ import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleCo
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerStatus;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -59,7 +59,7 @@ public final class ElectionListenerManagerTest {
@Before
public void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
ReflectionUtils.setSuperclassFieldValue(electionListenerManager, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(electionListenerManager, "leaderService", leaderService);
ReflectionUtils.setFieldValue(electionListenerManager, "serverService", serverService);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderServiceTest.java
index 5d5d6ec..b469f8c 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/LeaderServiceTest.java
@@ -23,8 +23,8 @@ import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -58,7 +58,7 @@ public final class LeaderServiceTest {
@Before
public void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
leaderService = new LeaderService(null, "test_job");
ReflectionUtils.setFieldValue(leaderService, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(leaderService, "serverService", serverService);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
index 03c0ba7..5bc3ee7 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/InstanceServiceTest.java
@@ -50,7 +50,7 @@ public final class InstanceServiceTest {
@Before
public void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
instanceService = new InstanceService(null, "test_job");
InstanceNode instanceNode = new InstanceNode("test_job");
ReflectionUtils.setFieldValue(instanceService, "instanceNode", instanceNode);
@@ -61,9 +61,9 @@ public final class InstanceServiceTest {
@Test
public void assertPersistOnline() {
instanceService.persistOnline();
- verify(jobNodeStorage).fillEphemeralJobNode("instances/127.0.0.1@-@0", "");
+ verify(jobNodeStorage).fillEphemeralJobNode("instances/127.0.0.1@-@0", "jobInstanceId: 127.0.0.1@-@0\nserverIp: 127.0.0.1\n");
}
-
+
@Test
public void assertRemoveInstance() {
instanceService.removeInstance();
@@ -73,6 +73,8 @@ public final class InstanceServiceTest {
@Test
public void assertGetAvailableJobInstances() {
when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
+ when(jobNodeStorage.getJobNodeData("instances/127.0.0.1@-@0")).thenReturn("jobInstanceId: 127.0.0.1@-@0\nlabels: labels\nserverIp: 127.0.0.1\n");
+ when(jobNodeStorage.getJobNodeData("instances/127.0.0.2@-@0")).thenReturn("jobInstanceId: 127.0.0.2@-@0\nlabels: labels\nserverIp: 127.0.0.2\n");
when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
assertThat(instanceService.getAvailableJobInstances(), is(Collections.singletonList(new JobInstance("127.0.0.1@-@0"))));
}
@@ -82,7 +84,7 @@ public final class InstanceServiceTest {
when(jobNodeStorage.isJobNodeExisted("instances/127.0.0.1@-@0")).thenReturn(true);
assertTrue(instanceService.isLocalJobInstanceExisted());
}
-
+
@Test
public void assertTriggerAllInstances() {
when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListenerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListenerTest.java
index 5a5e178..a3b6171 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListenerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/RegistryCenterConnectionStateListenerTest.java
@@ -64,7 +64,7 @@ public final class RegistryCenterConnectionStateListenerTest {
@Before
public void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
regCenterConnectionStateListener = new RegistryCenterConnectionStateListener(null, "test_job");
ReflectionUtils.setFieldValue(regCenterConnectionStateListener, "serverService", serverService);
ReflectionUtils.setFieldValue(regCenterConnectionStateListener, "instanceService", instanceService);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNodeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNodeTest.java
index f0e07fd..dea0a44 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNodeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerNodeTest.java
@@ -33,7 +33,7 @@ public final class ServerNodeTest {
@BeforeClass
public static void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
}
@Test
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerServiceTest.java
index 337a5d7..e136af6 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerServiceTest.java
@@ -54,7 +54,7 @@ public final class ServerServiceTest {
@Before
public void setUp() {
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
serverService = new ServerService(null, "test_job");
ServerNode serverNode = new ServerNode("test_job");
ReflectionUtils.setFieldValue(serverService, "serverNode", serverNode);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index eafbb95..5f5eee6 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -93,7 +93,7 @@ public final class ShardingServiceTest {
ReflectionUtils.setFieldValue(shardingService, "executionService", executionService);
ReflectionUtils.setFieldValue(shardingService, "instanceService", instanceService);
ReflectionUtils.setFieldValue(shardingService, "serverService", serverService);
- JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
}
@Test
@@ -178,9 +178,10 @@ public final class ShardingServiceTest {
verify(jobNodeStorage).fillEphemeralJobNode("leader/sharding/processing", "");
verify(jobNodeStorage).executeInTransaction(any(TransactionExecutionCallback.class));
}
-
+
@Test
public void assertGetShardingItemsWithNotAvailableServer() {
+ when(jobNodeStorage.getJobNodeData("instances/127.0.0.1@-@0")).thenReturn("jobInstanceId: 127.0.0.1@-@0\nserverIp: 127.0.0.1\n");
assertThat(shardingService.getShardingItems("127.0.0.1@-@0"), is(Collections.<Integer>emptyList()));
}
@@ -193,6 +194,7 @@ public final class ShardingServiceTest {
when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+ when(jobNodeStorage.getJobNodeData("instances/127.0.0.1@-@0")).thenReturn("jobInstanceId: 127.0.0.1@-@0\nserverIp: 127.0.0.1\n");
assertThat(shardingService.getShardingItems("127.0.0.1@-@0"), is(Arrays.asList(0, 2)));
JobRegistry.getInstance().shutdown("test_job");
}
@@ -219,6 +221,7 @@ public final class ShardingServiceTest {
when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+ when(jobNodeStorage.getJobNodeData("instances/127.0.0.1@-@0")).thenReturn("jobInstanceId: 127.0.0.1@-@0\nserverIp: 127.0.0.1\n");
assertThat(shardingService.getLocalShardingItems(), is(Arrays.asList(0, 2)));
JobRegistry.getInstance().shutdown("test_job");
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImpl.java
index 7721d5d..35ba1f9 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImpl.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImpl.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate;
import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerStatus;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
@@ -94,7 +96,8 @@ public final class JobOperateAPIImpl implements JobOperateAPI {
if (null != jobName && null != serverIp) {
JobNodePath jobNodePath = new JobNodePath(jobName);
for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
- if (serverIp.equals(each.split("@-@")[0])) {
+ JobInstance jobInstance = YamlEngine.unmarshal(regCenter.get(jobNodePath.getInstanceNodePath(each)), JobInstance.class);
+ if (serverIp.equals(jobInstance.getServerIp())) {
regCenter.remove(jobNodePath.getInstanceNodePath(each));
}
}
@@ -109,7 +112,8 @@ public final class JobOperateAPIImpl implements JobOperateAPI {
JobNodePath jobNodePath = new JobNodePath(job);
List<String> instances = regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath());
for (String each : instances) {
- if (serverIp.equals(each.split("@-@")[0])) {
+ JobInstance jobInstance = YamlEngine.unmarshal(regCenter.get(jobNodePath.getInstanceNodePath(each)), JobInstance.class);
+ if (serverIp.equals(jobInstance.getServerIp())) {
regCenter.remove(jobNodePath.getInstanceNodePath(each));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java
index f410a86..746f3d5 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
@@ -159,7 +160,8 @@ public final class JobStatisticsAPIImpl implements JobStatisticsAPI {
JobNodePath jobNodePath = new JobNodePath(jobName);
List<String> instances = regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath());
for (String each : instances) {
- if (ip.equals(each.split("@-@")[0])) {
+ JobInstance jobInstance = YamlEngine.unmarshal(regCenter.get(jobNodePath.getInstanceNodePath(each)), JobInstance.class);
+ if (ip.equals(jobInstance.getServerIp())) {
result++;
}
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java
index 4e8acf2..0e73628 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.ServerStatisticsAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ServerBriefInfo;
@@ -65,8 +67,8 @@ public final class ServerStatisticsAPIImpl implements ServerStatisticsAPI {
}
List<String> instances = regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath());
for (String each : instances) {
- String serverIp = each.split("@-@")[0];
- ServerBriefInfo serverInfo = servers.get(serverIp);
+ JobInstance jobInstance = YamlEngine.unmarshal(regCenter.get(jobNodePath.getInstanceNodePath(each)), JobInstance.class);
+ ServerBriefInfo serverInfo = servers.get(jobInstance.getServerIp());
if (null != serverInfo) {
serverInfo.getInstances().add(each);
serverInfo.setInstancesNum(serverInfo.getInstances().size());
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImpl.java
index df379c9..46a316e 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImpl.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImpl.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.ShardingStatisticsAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
@@ -59,9 +61,9 @@ public final class ShardingStatisticsAPIImpl implements ShardingStatisticsAPI {
result.setStatus(ShardingInfo.ShardingStatus.getShardingStatus(disabled, running, shardingError));
result.setFailover(regCenter.isExisted(jobNodePath.getShardingNodePath(item, "failover")));
if (null != instanceId) {
- String[] ipAndPid = instanceId.split("@-@");
- result.setServerIp(ipAndPid[0]);
- result.setInstanceId(ipAndPid[1]);
+ JobInstance jobInstance = YamlEngine.unmarshal(regCenter.get(jobNodePath.getInstanceNodePath(instanceId)), JobInstance.class);
+ result.setServerIp(jobInstance.getServerIp());
+ result.setInstanceId(jobInstance.getJobInstanceId());
}
return result;
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImplTest.java
index fed2885..6e9c0a7 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImplTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/JobOperateAPIImplTest.java
@@ -110,6 +110,7 @@ public final class JobOperateAPIImplTest {
@Test
public void assertShutdownWithJobNameAndServerIp() {
when(regCenter.getChildrenKeys("/test_job/instances")).thenReturn(Collections.singletonList("localhost@-@defaultInstance"));
+ when(regCenter.get("/test_job/instances/localhost@-@defaultInstance")).thenReturn("jobInstanceId: localhost@-@defaultInstance\nserverIp: localhost\n");
jobOperateAPI.shutdown("test_job", "localhost");
verify(regCenter).remove("/test_job/instances/localhost@-@defaultInstance");
}
@@ -127,6 +128,8 @@ public final class JobOperateAPIImplTest {
when(regCenter.getChildrenKeys("/")).thenReturn(Arrays.asList("test_job1", "test_job2"));
when(regCenter.getChildrenKeys("/test_job1/instances")).thenReturn(Collections.singletonList("localhost@-@defaultInstance"));
when(regCenter.getChildrenKeys("/test_job2/instances")).thenReturn(Collections.singletonList("localhost@-@defaultInstance"));
+ when(regCenter.get("/test_job1/instances/localhost@-@defaultInstance")).thenReturn("jobInstanceId: localhost@-@defaultInstance\nserverIp: localhost\n");
+ when(regCenter.get("/test_job2/instances/localhost@-@defaultInstance")).thenReturn("jobInstanceId: localhost@-@defaultInstance\nserverIp: localhost\n");
jobOperateAPI.shutdown(null, "localhost");
verify(regCenter).getChildrenKeys("/");
verify(regCenter).remove("/test_job1/instances/localhost@-@defaultInstance");
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImplTest.java
index 246e004..d0d6497 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImplTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/JobStatisticsAPIImplTest.java
@@ -156,6 +156,7 @@ public final class JobStatisticsAPIImplTest {
when(regCenter.isExisted("/test_job_2/servers/ip1")).thenReturn(true);
when(regCenter.get("/test_job_2/servers/ip1")).thenReturn("DISABLED");
when(regCenter.getChildrenKeys("/test_job_1/instances")).thenReturn(Collections.singletonList("ip1@-@defaultInstance"));
+ when(regCenter.get("/test_job_1/instances/ip1@-@defaultInstance")).thenReturn("jobInstanceId: ip1@-@defaultInstance\nserverIp: ip1\n");
int i = 0;
for (JobBriefInfo each : jobStatisticsAPI.getJobsBriefInfo("ip1")) {
assertThat(each.getJobName(), is("test_job_" + ++i));
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImplTest.java
index b1eddc8..eaa1ed8 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImplTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImplTest.java
@@ -66,6 +66,9 @@ public final class ServerStatisticsAPIImplTest {
when(regCenter.get("/test_job2/servers/ip1")).thenReturn("DISABLED");
when(regCenter.get("/test_job2/servers/ip2")).thenReturn("DISABLED");
+ when(regCenter.get("/test_job1/instances/ip1@-@defaultInstance")).thenReturn("jobInstanceId: ip1@-@defaultInstance\nserverIp: ip1\n");
+ when(regCenter.get("/test_job2/instances/ip1@-@defaultInstance")).thenReturn("jobInstanceId: ip1@-@defaultInstance\nserverIp: ip1\n");
+ when(regCenter.get("/test_job2/instances/ip2@-@defaultInstance2")).thenReturn("jobInstanceId: ip2@-@defaultInstance2\nserverIp: ip2\n");
when(regCenter.getChildrenKeys("/test_job2/instances")).thenReturn(Arrays.asList("ip1@-@defaultInstance", "ip2@-@defaultInstance2"));
int i = 0;
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImplTest.java
index 447d06f..6a69e1a 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImplTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ShardingStatisticsAPIImplTest.java
@@ -53,6 +53,10 @@ public final class ShardingStatisticsAPIImplTest {
when(regCenter.get("/test_job/sharding/1/instance")).thenReturn("ip2@-@2341");
when(regCenter.get("/test_job/sharding/2/instance")).thenReturn("ip3@-@3412");
when(regCenter.get("/test_job/sharding/3/instance")).thenReturn("ip4@-@4123");
+ when(regCenter.get("/test_job/instances/ip1@-@1234")).thenReturn("jobInstanceId: ip1@-@1234\nserverIp: ip1\n");
+ when(regCenter.get("/test_job/instances/ip2@-@2341")).thenReturn("jobInstanceId: ip2@-@2341\nserverIp: ip2\n");
+ when(regCenter.get("/test_job/instances/ip3@-@3412")).thenReturn("jobInstanceId: ip3@-@3412\nserverIp: ip3\n");
+ when(regCenter.get("/test_job/instances/ip4@-@4123")).thenReturn("jobInstanceId: ip4@-@4123\nserverIp: ip4\n");
when(regCenter.isExisted("/test_job/instances/ip4@-@4123")).thenReturn(true);
when(regCenter.isExisted("/test_job/sharding/0/running")).thenReturn(true);
when(regCenter.isExisted("/test_job/sharding/1/running")).thenReturn(false);
@@ -69,23 +73,23 @@ public final class ShardingStatisticsAPIImplTest {
case 1:
assertThat(each.getStatus(), is(ShardingInfo.ShardingStatus.RUNNING));
assertThat(each.getServerIp(), is("ip1"));
- assertThat(each.getInstanceId(), is("1234"));
+ assertThat(each.getInstanceId(), is("ip1@-@1234"));
break;
case 2:
assertTrue(each.isFailover());
assertThat(each.getStatus(), is(ShardingInfo.ShardingStatus.SHARDING_FLAG));
assertThat(each.getServerIp(), is("ip2"));
- assertThat(each.getInstanceId(), is("2341"));
+ assertThat(each.getInstanceId(), is("ip2@-@2341"));
break;
case 3:
assertThat(each.getStatus(), is(ShardingInfo.ShardingStatus.DISABLED));
assertThat(each.getServerIp(), is("ip3"));
- assertThat(each.getInstanceId(), is("3412"));
+ assertThat(each.getInstanceId(), is("ip3@-@3412"));
break;
case 4:
assertThat(each.getStatus(), is(ShardingInfo.ShardingStatus.PENDING));
assertThat(each.getServerIp(), is("ip4"));
- assertThat(each.getInstanceId(), is("4123"));
+ assertThat(each.getInstanceId(), is("ip4@-@4123"));
break;
default:
break;