You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/14 17:15:00 UTC
[shardingsphere-elasticjob-lite] branch master updated: Upgrade
guava and curator version to newest (#1060)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new d8f288d Upgrade guava and curator version to newest (#1060)
d8f288d is described below
commit d8f288dba00334c3140dbcb844fec98f46946647
Author: Liang Zhang <te...@163.com>
AuthorDate: Wed Jul 15 01:14:53 2020 +0800
Upgrade guava and curator version to newest (#1060)
* Upgrade guava and curator version to newest
* Use CuratorCache instead of deprecated TreeCache
* Use CuratorMultiTransaction instead of deprecated CuratorTransactionFinal
* revise deprecated KillSession.kill
* Use sha256 instead of deprecated md5
---
.../config/job/CloudJobConfigurationListener.java | 42 ++++----
.../scheduler/state/failover/FailoverService.java | 2 +-
.../job/CloudJobConfigurationListenerTest.java | 28 ++---
.../reg/zookeeper/ZookeeperRegistryCenter.java | 27 ++---
.../zookeeper/ZookeeperElectionServiceTest.java | 2 +-
.../ZookeeperRegistryCenterMiscellaneousTest.java | 4 +-
.../domain/EventTraceDataSourceFactory.java | 7 +-
.../security/UserAuthenticationService.java | 8 +-
.../internal/config/RescheduleListenerManager.java | 5 +-
.../internal/election/ElectionListenerManager.java | 8 +-
.../internal/failover/FailoverListenerManager.java | 7 +-
.../guarantee/GuaranteeListenerManager.java | 7 +-
.../internal/instance/ShutdownListenerManager.java | 3 +-
.../internal/instance/TriggerListenerManager.java | 3 +-
.../internal/listener/AbstractJobListener.java | 16 ++-
.../internal/listener/AbstractListenerManager.java | 4 +-
.../sharding/MonitorExecutionListenerManager.java | 5 +-
.../internal/sharding/ShardingListenerManager.java | 5 +-
.../lite/internal/sharding/ShardingService.java | 25 +++--
.../lite/internal/snapshot/SnapshotService.java | 15 +--
.../lite/internal/storage/JobNodeStorage.java | 25 +++--
.../storage/TransactionExecutionCallback.java | 12 ++-
.../config/RescheduleListenerManagerTest.java | 10 +-
.../election/ElectionListenerManagerTest.java | 22 ++--
.../failover/FailoverListenerManagerTest.java | 22 ++--
.../guarantee/GuaranteeListenerManagerTest.java | 14 +--
.../instance/ShutdownListenerManagerTest.java | 16 +--
.../instance/TriggerListenerManagerTest.java | 14 +--
.../lite/internal/listener/JobListenerTest.java | 14 +--
.../internal/listener/fixture/FooJobListener.java | 1 -
.../MonitorExecutionListenerManagerTest.java | 11 +-
.../sharding/ShardingListenerManagerTest.java | 20 ++--
.../internal/sharding/ShardingServiceTest.java | 49 ++++-----
.../lite/internal/storage/JobNodeStorageTest.java | 115 ++++++++++++---------
.../internal/reg/RegistryCenterFactory.java | 2 +-
examples/pom.xml | 2 +-
pom.xml | 4 +-
37 files changed, 290 insertions(+), 286 deletions(-)
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListener.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListener.java
index 3c12f9b..e990c02 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListener.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListener.java
@@ -18,11 +18,9 @@
package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job;
import lombok.extern.slf4j.Slf4j;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.shardingsphere.elasticjob.cloud.config.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.config.pojo.CloudJobConfigurationPOJO;
@@ -38,7 +36,7 @@ import java.util.concurrent.Executors;
* Cloud job configuration change listener.
*/
@Slf4j
-public final class CloudJobConfigurationListener implements TreeCacheListener {
+public final class CloudJobConfigurationListener implements CuratorCacheListener {
private final CoordinatorRegistryCenter regCenter;
@@ -53,15 +51,15 @@ public final class CloudJobConfigurationListener implements TreeCacheListener {
}
@Override
- public void childEvent(final CuratorFramework client, final TreeCacheEvent event) {
- String path = null == event.getData() ? "" : event.getData().getPath();
- if (isJobConfigNode(event, path, Type.NODE_ADDED)) {
- CloudJobConfiguration cloudJobConfig = getCloudJobConfiguration(event);
+ public void event(final Type type, final ChildData oldData, final ChildData data) {
+ String path = data.getPath();
+ if (Type.NODE_CREATED == type && isJobConfigNode(path)) {
+ CloudJobConfiguration cloudJobConfig = getCloudJobConfiguration(data);
if (null != cloudJobConfig) {
producerManager.schedule(cloudJobConfig);
}
- } else if (isJobConfigNode(event, path, Type.NODE_UPDATED)) {
- CloudJobConfiguration cloudJobConfig = getCloudJobConfiguration(event);
+ } else if (Type.NODE_CHANGED == type && isJobConfigNode(path)) {
+ CloudJobConfiguration cloudJobConfig = getCloudJobConfiguration(data);
if (null == cloudJobConfig) {
return;
}
@@ -72,19 +70,19 @@ public final class CloudJobConfigurationListener implements TreeCacheListener {
readyService.setMisfireDisabled(cloudJobConfig.getJobConfig().getJobName());
}
producerManager.reschedule(cloudJobConfig.getJobConfig().getJobName());
- } else if (isJobConfigNode(event, path, Type.NODE_REMOVED)) {
+ } else if (Type.NODE_DELETED == type && isJobConfigNode(path)) {
String jobName = path.substring(CloudJobConfigurationNode.ROOT.length() + 1, path.length());
producerManager.unschedule(jobName);
}
}
- private boolean isJobConfigNode(final TreeCacheEvent event, final String path, final Type type) {
- return type == event.getType() && path.startsWith(CloudJobConfigurationNode.ROOT) && path.length() > CloudJobConfigurationNode.ROOT.length();
+ private boolean isJobConfigNode(final String path) {
+ return path.startsWith(CloudJobConfigurationNode.ROOT) && path.length() > CloudJobConfigurationNode.ROOT.length();
}
- private CloudJobConfiguration getCloudJobConfiguration(final TreeCacheEvent event) {
+ private CloudJobConfiguration getCloudJobConfiguration(final ChildData data) {
try {
- return YamlEngine.unmarshal(new String(event.getData().getData()), CloudJobConfigurationPOJO.class).toCloudJobConfiguration();
+ return YamlEngine.unmarshal(new String(data.getData()), CloudJobConfigurationPOJO.class).toCloudJobConfiguration();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
log.warn("Wrong Cloud Job Configuration with:", ex.getMessage());
@@ -97,22 +95,22 @@ public final class CloudJobConfigurationListener implements TreeCacheListener {
* Start the listener service of the cloud job service.
*/
public void start() {
- getCache().getListenable().addListener(this, Executors.newSingleThreadExecutor());
+ getCache().listenable().addListener(this, Executors.newSingleThreadExecutor());
}
/**
* Stop the listener service of the cloud job service.
*/
public void stop() {
- getCache().getListenable().removeListener(this);
+ getCache().listenable().removeListener(this);
}
- private TreeCache getCache() {
- TreeCache result = (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
+ private CuratorCache getCache() {
+ CuratorCache result = (CuratorCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
if (null != result) {
return result;
}
regCenter.addCacheData(CloudJobConfigurationNode.ROOT);
- return (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
+ return (CuratorCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
}
}
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/failover/FailoverService.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/failover/FailoverService.java
index b0ca769..44a0400 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/failover/FailoverService.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/failover/FailoverService.java
@@ -114,7 +114,7 @@ public final class FailoverService {
List<Integer> result = new ArrayList<>(taskIdList.size());
for (String each : taskIdList) {
MetaInfo metaInfo = MetaInfo.from(each);
- if (assignedTasks.add(Hashing.md5().newHasher().putString(jobName, Charsets.UTF_8).putInt(metaInfo.getShardingItems().get(0)).hash()) && !runningService.isTaskRunning(metaInfo)) {
+ if (assignedTasks.add(Hashing.sha256().newHasher().putString(jobName, Charsets.UTF_8).putInt(metaInfo.getShardingItems().get(0)).hash()) && !runningService.isTaskRunning(metaInfo)) {
result.add(metaInfo.getShardingItems().get(0));
}
}
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java
index 23a48d9..e1501fd 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.cloud.config.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudJsonConstants;
@@ -56,16 +56,8 @@ public final class CloudJobConfigurationListenerTest {
}
@Test
- public void assertChildEventWhenDataIsNull() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null));
- verify(producerManager, times(0)).schedule(ArgumentMatchers.any());
- verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
- verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
- }
-
- @Test
public void assertChildEventWhenIsNotConfigPath() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CHANGED, null, new ChildData("/other/test_job", null, "".getBytes()));
verify(producerManager, times(0)).schedule(ArgumentMatchers.any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
@@ -73,7 +65,7 @@ public final class CloudJobConfigurationListenerTest {
@Test
public void assertChildEventWhenIsRootConfigPath() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_DELETED, null, new ChildData("/config/job", null, "".getBytes()));
verify(producerManager, times(0)).schedule(ArgumentMatchers.any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
@@ -81,7 +73,7 @@ public final class CloudJobConfigurationListenerTest {
@Test
public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CREATED, null, new ChildData("/config/job/test_job", null, "".getBytes()));
verify(producerManager, times(0)).schedule(ArgumentMatchers.any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
@@ -89,36 +81,34 @@ public final class CloudJobConfigurationListenerTest {
@Test
public void assertChildEventWhenStateIsAddAndIsConfigPath() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CREATED, null, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes()));
verify(producerManager).schedule(ArgumentMatchers.any());
}
@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndTransientJob() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CHANGED, null, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes()));
verify(readyService, times(0)).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.any());
}
@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
- new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CHANGED, null, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes()));
verify(readyService).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.any());
}
@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
- new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_CHANGED, null, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes()));
verify(readyService).setMisfireDisabled("test_job");
verify(producerManager).reschedule(ArgumentMatchers.any());
}
@Test
public void assertChildEventWhenStateIsRemovedAndIsJobConfigPath() {
- cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job/test_job", null, "".getBytes())));
+ cloudJobConfigurationListener.event(Type.NODE_DELETED, null, new ChildData("/config/job/test_job", null, "".getBytes()));
verify(producerManager).unschedule("test_job");
}
}
diff --git a/elasticjob-infra/elasticjob-registry-center/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java b/elasticjob-infra/elasticjob-registry-center/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
index 0a507fb..dfd611a 100644
--- a/elasticjob-infra/elasticjob-registry-center/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
+++ b/elasticjob-infra/elasticjob-registry-center/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
@@ -26,8 +26,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -43,6 +44,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -55,7 +57,7 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Getter(AccessLevel.PROTECTED)
private ZookeeperConfiguration zkConfig;
- private final Map<String, TreeCache> caches = new ConcurrentHashMap<>();
+ private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
@Getter
private CuratorFramework client;
@@ -108,7 +110,7 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Override
public void close() {
- for (Entry<String, TreeCache> each : caches.entrySet()) {
+ for (Entry<String, CuratorCache> each : caches.entrySet()) {
each.getValue().close();
}
waitForCacheClose();
@@ -130,19 +132,19 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Override
public String get(final String key) {
- TreeCache cache = findTreeCache(key);
+ CuratorCache cache = findCuratorCache(key);
if (null == cache) {
return getDirectly(key);
}
- ChildData resultInCache = cache.getCurrentData(key);
- if (null != resultInCache) {
- return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
+ Optional<ChildData> resultInCache = cache.get(key);
+ if (resultInCache.isPresent()) {
+ return null == resultInCache.get().getData() ? null : new String(resultInCache.get().getData(), Charsets.UTF_8);
}
return getDirectly(key);
}
- private TreeCache findTreeCache(final String key) {
- for (Entry<String, TreeCache> entry : caches.entrySet()) {
+ private CuratorCache findCuratorCache(final String key) {
+ for (Entry<String, CuratorCache> entry : caches.entrySet()) {
if (key.startsWith(entry.getKey())) {
return entry.getValue();
}
@@ -221,7 +223,8 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Override
public void update(final String key, final String value) {
try {
- client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
+ TransactionOp transactionOp = client.transactionOp();
+ client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(Charsets.UTF_8)));
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
@@ -299,7 +302,7 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Override
public void addCacheData(final String cachePath) {
- TreeCache cache = new TreeCache(client, cachePath);
+ CuratorCache cache = CuratorCache.build(client, cachePath);
try {
cache.start();
//CHECKSTYLE:OFF
@@ -312,7 +315,7 @@ public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter
@Override
public void evictCacheData(final String cachePath) {
- TreeCache cache = caches.remove(cachePath + "/");
+ CuratorCache cache = caches.remove(cachePath + "/");
if (null != cache) {
cache.close();
}
diff --git a/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperElectionServiceTest.java b/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperElectionServiceTest.java
index aee43d3..4e1d02c 100644
--- a/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperElectionServiceTest.java
+++ b/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperElectionServiceTest.java
@@ -62,7 +62,7 @@ public class ZookeeperElectionServiceTest {
anotherClient.start();
anotherClient.blockUntilConnected();
anotherService.start();
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), EmbedTestingServer.getConnectionString());
+ KillSession.kill(client.getZookeeperClient().getZooKeeper());
service.stop();
verify(anotherElectionCandidate).startLeadership();
}
diff --git a/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java b/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java
index 4240161..fe698f9 100644
--- a/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java
+++ b/elasticjob-infra/elasticjob-registry-center/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterMiscellaneousTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.elasticjob.reg.zookeeper;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.fixture.EmbedTestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -57,7 +57,7 @@ public final class ZookeeperRegistryCenterMiscellaneousTest {
@Test
public void assertGetRawCache() {
- assertThat(zkRegCenter.getRawCache("/test"), instanceOf(TreeCache.class));
+ assertThat(zkRegCenter.getRawCache("/test"), instanceOf(CuratorCache.class));
}
@Test
diff --git a/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/domain/EventTraceDataSourceFactory.java b/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/domain/EventTraceDataSourceFactory.java
index f43b302..a5dd6cd 100644
--- a/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/domain/EventTraceDataSourceFactory.java
+++ b/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/domain/EventTraceDataSourceFactory.java
@@ -17,17 +17,16 @@
package org.apache.shardingsphere.elasticjob.lite.console.domain;
-import java.util.concurrent.ConcurrentHashMap;
-
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
-
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Event trace data source factory.
*/
@@ -46,7 +45,7 @@ public final class EventTraceDataSourceFactory {
* @return event trace data source
*/
public static EventTraceDataSource createEventTraceDataSource(final String driverClassName, final String url, final String username, final String password) {
- Hasher hasher = Hashing.md5().newHasher().putString(driverClassName, Charsets.UTF_8).putString(url, Charsets.UTF_8);
+ Hasher hasher = Hashing.sha256().newHasher().putString(driverClassName, Charsets.UTF_8).putString(url, Charsets.UTF_8);
if (!Strings.isNullOrEmpty(username)) {
hasher.putString(username, Charsets.UTF_8);
}
diff --git a/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/security/UserAuthenticationService.java b/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/security/UserAuthenticationService.java
index c228111..3751117 100644
--- a/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/security/UserAuthenticationService.java
+++ b/elasticjob-lite/elasticjob-lite-console/src/main/java/org/apache/shardingsphere/elasticjob/lite/console/security/UserAuthenticationService.java
@@ -66,7 +66,6 @@ public class UserAuthenticationService {
String response = authorizationMap.get("response");
String password;
AuthenticationResult authenticationResult;
-
if (rootUsername.equals(username)) {
password = rootPassword;
authenticationResult = new AuthenticationResult(true, false);
@@ -76,10 +75,9 @@ public class UserAuthenticationService {
} else {
return new AuthenticationResult(false, false);
}
-
- String hash1 = Hashing.md5().hashBytes((username + ":" + realm + ":" + password).getBytes()).toString();
- String hash2 = Hashing.md5().hashBytes((method + ":" + uri).getBytes()).toString();
- String exceptResponse = Hashing.md5().hashBytes((hash1 + ":" + nonce + ":" + nc + ":" + cnonce + ":" + qop + ":" + hash2).getBytes()).toString();
+ String hash1 = Hashing.sha256().hashBytes((username + ":" + realm + ":" + password).getBytes()).toString();
+ String hash2 = Hashing.sha256().hashBytes((method + ":" + uri).getBytes()).toString();
+ String exceptResponse = Hashing.sha256().hashBytes((hash1 + ":" + nonce + ":" + nc + ":" + cnonce + ":" + qop + ":" + hash2).getBytes()).toString();
if (StringUtils.equals(response, exceptResponse)) {
return authenticationResult;
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManager.java
index 1382bb9..9b594e4 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManager.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.elasticjob.lite.internal.config;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
/**
* Reschedule listener manager.
@@ -49,7 +48,7 @@ public final class RescheduleListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
+ if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getCron());
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
index 223a7fd..4f4a02b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManager.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.elasticjob.lite.internal.election;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
@@ -74,14 +73,11 @@ public final class ElectionListenerManager extends AbstractListenerManager {
private boolean isPassiveElection(final String path, final Type eventType) {
JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
- if (Objects.isNull(jobInstance)) {
- return false;
- }
- return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
+ return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
- return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
+ return leaderNode.isLeaderInstancePath(path) && Type.NODE_DELETED == eventType;
}
private boolean isLocalServerEnabled(final String path, final String data) {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 92296b8..fe2396b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.config.pojo.JobConfigurationPOJO;
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListe
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import java.util.List;
@@ -74,7 +73,7 @@ public final class FailoverListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
+ if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
@@ -99,7 +98,7 @@ public final class FailoverListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
+ if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
failoverService.removeFailoverInfo();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java
index 2202fca..a847f0d 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java
@@ -17,12 +17,11 @@
package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
-import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import java.util.List;
@@ -51,7 +50,7 @@ public final class GuaranteeListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
+ if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
@@ -65,7 +64,7 @@ public final class GuaranteeListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (Type.NODE_REMOVED == eventType && guaranteeNode.isCompletedRootNode(path)) {
+ if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManager.java
index 91e302f..0d863f0 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManager.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.elasticjob.lite.internal.instance;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -61,7 +60,7 @@ public final class ShutdownListenerManager extends AbstractListenerManager {
}
private boolean isRemoveInstance(final String path, final Type eventType) {
- return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
+ return instanceNode.isLocalInstancePath(path) && Type.NODE_DELETED == eventType;
}
private boolean isReconnectedRegistryCenter() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManager.java
index 63024da..e6fbb4d 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManager.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobLi
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
/**
* Job trigger listener manager.
@@ -50,7 +49,7 @@ public final class TriggerListenerManager extends AbstractListenerManager {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_UPDATED != eventType) {
+ if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_CHANGED != eventType) {
return;
}
instanceService.clearTriggerFlag();
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractJobListener.java
index 965d4c3..1d0379d 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractJobListener.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractJobListener.java
@@ -18,28 +18,24 @@
package org.apache.shardingsphere.elasticjob.lite.internal.listener;
import com.google.common.base.Charsets;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
/**
* Job Listener.
*/
-public abstract class AbstractJobListener implements TreeCacheListener {
+public abstract class AbstractJobListener implements CuratorCacheListener {
@Override
- public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) {
- ChildData childData = event.getData();
- if (null == childData) {
+ public final void event(final Type type, final ChildData oldData, final ChildData data) {
+ if (null == data) {
return;
}
- String path = childData.getPath();
+ String path = data.getPath();
if (path.isEmpty()) {
return;
}
- dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
+ dataChanged(path, type, null == data.getData() ? "" : new String(data.getData(), Charsets.UTF_8));
}
protected abstract void dataChanged(String path, Type eventType, String data);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractListenerManager.java
index 354ccaf..25f4f17 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/AbstractListenerManager.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.listener;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -37,7 +37,7 @@ public abstract class AbstractListenerManager {
*/
public abstract void start();
- protected void addDataListener(final TreeCacheListener listener) {
+ protected void addDataListener(final CuratorCacheListener listener) {
jobNodeStorage.addDataListener(listener);
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManager.java
index 9388380..12ad6b1 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManager.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
import org.apache.shardingsphere.elasticjob.lite.internal.config.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
/**
* Monitor execution listener manager.
@@ -49,7 +48,7 @@ public final class MonitorExecutionListenerManager extends AbstractListenerManag
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
- if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
+ if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
executionService.clearAllRunningInfo();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManager.java
index f0a00c1..f553293 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManager.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
import org.apache.shardingsphere.elasticjob.lite.internal.config.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListe
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerNode;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
/**
* Sharding listener manager.
@@ -82,7 +81,7 @@ public final class ShardingListenerManager extends AbstractListenerManager {
}
private boolean isInstanceChange(final Type eventType, final String path) {
- return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
+ return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
}
private boolean isServerChange(final String path) {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index 59b7866..e97c0c8 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -17,10 +17,15 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategyFactory;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
@@ -31,15 +36,12 @@ import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.TransactionExecutionCallback;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
/**
* Sharding service.
@@ -200,14 +202,17 @@ public final class ShardingService {
private final Map<JobInstance, List<Integer>> shardingResults;
@Override
- public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
- for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
+ public List<CuratorOp> createCuratorOperators(final TransactionOp transactionOp) throws Exception {
+ List<CuratorOp> result = new LinkedList<>();
+ for (Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
for (int shardingItem : entry.getValue()) {
- curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
+ result.add(transactionOp.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()));
}
}
- curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
- curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
+ System.out.println(jobNodePath.getFullPath(ShardingNode.NECESSARY));
+ result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)));
+ result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)));
+ return result;
}
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java
index ddd49ff..609ad0b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.elasticjob.lite.internal.snapshot;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.shardingsphere.elasticjob.lite.internal.util.SensitiveInfoUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -32,6 +32,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.StringJoiner;
import java.util.stream.Collectors;
@@ -113,14 +114,14 @@ public final class SnapshotService {
if (null == zkValue) {
zkValue = "";
}
- TreeCache treeCache = (TreeCache) regCenter.getRawCache("/" + jobName);
- ChildData treeCacheData = treeCache.getCurrentData(zkPath);
- String treeCachePath = null == treeCacheData ? "" : treeCacheData.getPath();
- String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData());
- if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) {
+ CuratorCache cache = (CuratorCache) regCenter.getRawCache("/" + jobName);
+ Optional<ChildData> cacheData = cache.get(zkPath);
+ String cachePath = cacheData.map(ChildData::getPath).orElse("");
+ String cacheValue = cacheData.map(childData -> new String(childData.getData())).orElse("");
+ if (zkValue.equals(cacheValue) && zkPath.equals(cachePath)) {
result.add(new StringJoiner(" | ").add(zkPath).add(zkValue).toString());
} else {
- result.add(new StringJoiner(" | ").add(zkPath).add(zkValue).add(treeCachePath).add(treeCacheValue).toString());
+ result.add(new StringJoiner(" | ").add(zkPath).add(zkValue).add(cachePath).add(cacheValue).toString());
}
dumpDirectly(zkPath, jobName, result);
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
index 9a06651..63e8eef 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
@@ -18,15 +18,17 @@
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -180,13 +182,16 @@ public final class JobNodeStorage {
/**
* Execute operator in transaction.
*
- * @param callback execute callback
+ * @param callback transaction execution callback
*/
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
- CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
- callback.execute(curatorTransactionFinal);
- curatorTransactionFinal.commit();
+ List<CuratorOp> operations = new LinkedList<>();
+ CuratorFramework client = getClient();
+ TransactionOp transactionOp = client.transactionOp();
+ operations.add(transactionOp.check().forPath("/"));
+ operations.addAll(callback.createCuratorOperators(transactionOp));
+ client.transaction().forOperations(operations);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
@@ -238,9 +243,9 @@ public final class JobNodeStorage {
*
* @param listener data listener
*/
- public void addDataListener(final TreeCacheListener listener) {
- TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
- cache.getListenable().addListener(listener);
+ public void addDataListener(final CuratorCacheListener listener) {
+ CuratorCache cache = (CuratorCache) regCenter.getRawCache("/" + jobName);
+ cache.listenable().addListener(listener);
}
/**
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/TransactionExecutionCallback.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/TransactionExecutionCallback.java
index ddf54af..be0296d 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/TransactionExecutionCallback.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/TransactionExecutionCallback.java
@@ -17,7 +17,10 @@
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+
+import java.util.List;
/**
* Transaction execution callback.
@@ -25,10 +28,11 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
public interface TransactionExecutionCallback {
/**
- * Execute in transaction.
+ * Create curator operators.
*
- * @param curatorTransactionFinal transaction execution context
+ * @param transactionOp transaction operation
+ * @return curator operations
* @throws Exception exception
*/
- void execute(CuratorTransactionFinal curatorTransactionFinal) throws Exception;
+ List<CuratorOp> createCuratorOperators(TransactionOp transactionOp) throws Exception;
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManagerTest.java
index df4d28c..8dfba56 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/RescheduleListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.config;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -62,19 +62,19 @@ public final class RescheduleListenerManagerTest {
@Test
public void assertCronSettingChangedJobListenerWhenIsNotCronPath() {
- rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config/other", Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config/other", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(jobScheduleController, times(0)).rescheduleJob(ArgumentMatchers.any());
}
@Test
public void assertCronSettingChangedJobListenerWhenIsCronPathButNotUpdate() {
- rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(jobScheduleController, times(0)).rescheduleJob(ArgumentMatchers.any());
}
@Test
public void assertCronSettingChangedJobListenerWhenIsCronPathAndUpdateButCannotFindJob() {
- rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteYamlConstants.getJobYaml());
+ rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYaml());
verify(jobScheduleController, times(0)).rescheduleJob(ArgumentMatchers.any());
}
@@ -83,7 +83,7 @@ public final class RescheduleListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteYamlConstants.getJobYaml());
+ rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYaml());
verify(jobScheduleController).rescheduleJob("0/1 * * * * ?");
JobRegistry.getInstance().shutdown("test_job");
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
index 6ea5255..d9b59d5 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/election/ElectionListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.election;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
@@ -73,25 +73,25 @@ public final class ElectionListenerManagerTest {
@Test
public void assertIsNotLeaderInstancePathAndServerPath() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/other", Type.NODE_REMOVED, "127.0.0.1");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/other", Type.NODE_DELETED, "127.0.0.1");
verify(leaderService, times(0)).electLeader();
}
@Test
public void assertLeaderElectionWhenAddLeaderInstancePath() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_ADDED, "127.0.0.1");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_CREATED, "127.0.0.1");
verify(leaderService, times(0)).electLeader();
}
@Test
public void assertLeaderElectionWhenRemoveLeaderInstancePathWithoutAvailableServers() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_REMOVED, "127.0.0.1");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_DELETED, "127.0.0.1");
verify(leaderService, times(0)).electLeader();
}
@Test
public void assertLeaderElectionWhenRemoveLeaderInstancePathWithAvailableServerButJobInstanceIsShutdown() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_REMOVED, "127.0.0.1");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_DELETED, "127.0.0.1");
verify(leaderService, times(0)).electLeader();
}
@@ -100,20 +100,20 @@ public final class ElectionListenerManagerTest {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(serverService.isAvailableServer("127.0.0.1")).thenReturn(true);
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_REMOVED, "127.0.0.1");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/leader/election/instance", Type.NODE_DELETED, "127.0.0.1");
verify(leaderService).electLeader();
JobRegistry.getInstance().shutdown("test_job");
}
@Test
public void assertLeaderElectionWhenServerDisableWithoutLeader() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_REMOVED, ServerStatus.DISABLED.name());
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_DELETED, ServerStatus.DISABLED.name());
verify(leaderService, times(0)).electLeader();
}
@Test
public void assertLeaderElectionWhenServerEnableWithLeader() {
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_UPDATED, "");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_CHANGED, "");
verify(leaderService, times(0)).electLeader();
}
@@ -121,21 +121,21 @@ public final class ElectionListenerManagerTest {
public void assertLeaderElectionWhenServerEnableWithoutLeader() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_UPDATED, "");
+ electionListenerManager.new LeaderElectionJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_CHANGED, "");
verify(leaderService).electLeader();
JobRegistry.getInstance().shutdown("test_job");
}
@Test
public void assertLeaderAbdicationWhenFollowerDisable() {
- electionListenerManager.new LeaderAbdicationJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_UPDATED, ServerStatus.DISABLED.name());
+ electionListenerManager.new LeaderAbdicationJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_CHANGED, ServerStatus.DISABLED.name());
verify(leaderService, times(0)).removeLeader();
}
@Test
public void assertLeaderAbdicationWhenLeaderDisable() {
when(leaderService.isLeader()).thenReturn(true);
- electionListenerManager.new LeaderAbdicationJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_UPDATED, ServerStatus.DISABLED.name());
+ electionListenerManager.new LeaderAbdicationJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_CHANGED, ServerStatus.DISABLED.name());
verify(leaderService).removeLeader();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index 0f37c3c..4d4e93b 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
@@ -78,7 +78,7 @@ public final class FailoverListenerManagerTest {
@Test
public void assertJobCrashedJobListenerWhenFailoverDisabled() {
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(failoverService, times(0)).failoverIfNecessary();
}
@@ -87,7 +87,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_ADDED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CREATED, "");
verify(failoverService, times(0)).failoverIfNecessary();
JobRegistry.getInstance().shutdown("test_job");
}
@@ -97,7 +97,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/other/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/other/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(failoverService, times(0)).failoverIfNecessary();
JobRegistry.getInstance().shutdown("test_job");
}
@@ -107,7 +107,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(failoverService, times(0)).failoverIfNecessary();
JobRegistry.getInstance().shutdown("test_job");
}
@@ -118,7 +118,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
when(shardingService.getShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_REMOVED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
verify(failoverService).setCrashedFailoverFlag(0);
verify(failoverService).setCrashedFailoverFlag(2);
verify(failoverService, times(2)).failoverIfNecessary();
@@ -131,7 +131,7 @@ public final class FailoverListenerManagerTest {
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
when(failoverService.getFailoverItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
- failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_REMOVED, "");
+ failoverListenerManager.new JobCrashedJobListener().dataChanged("/test_job/instances/127.0.0.1@-@1", Type.NODE_DELETED, "");
verify(failoverService).setCrashedFailoverFlag(1);
verify(failoverService).failoverIfNecessary();
JobRegistry.getInstance().shutdown("test_job");
@@ -139,25 +139,25 @@ public final class FailoverListenerManagerTest {
@Test
public void assertFailoverSettingsChangedJobListenerWhenIsNotFailoverPath() {
- failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/other", Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/other", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(failoverService, times(0)).removeFailoverInfo();
}
@Test
public void assertFailoverSettingsChangedJobListenerWhenIsFailoverPathButNotUpdate() {
- failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_ADDED, "");
+ failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CREATED, "");
verify(failoverService, times(0)).removeFailoverInfo();
}
@Test
public void assertFailoverSettingsChangedJobListenerWhenIsFailoverPathAndUpdateButEnableFailover() {
- failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteYamlConstants.getJobYaml());
+ failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYaml());
verify(failoverService, times(0)).removeFailoverInfo();
}
@Test
public void assertFailoverSettingsChangedJobListenerWhenIsFailoverPathAndUpdateButDisableFailover() {
- failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteYamlConstants.getJobYamlWithFailover(false));
+ failoverListenerManager.new FailoverSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYamlWithFailover(false));
verify(failoverService).removeFailoverInfo();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java
index 2fa3eb1..2007a13 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.guarantee;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
@@ -63,37 +63,37 @@ public final class GuaranteeListenerManagerTest {
@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() {
- guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_UPDATED, "");
+ guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}
@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() {
- guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_REMOVED, "");
+ guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}
@Test
public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() {
- guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_REMOVED, "");
+ guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskStart();
}
@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() {
- guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_UPDATED, "");
+ guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}
@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() {
- guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_REMOVED, "");
+ guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}
@Test
public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() {
- guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_REMOVED, "");
+ guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManagerTest.java
index 2c59c08..2023c51 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/ShutdownListenerManagerTest.java
@@ -17,14 +17,14 @@
package org.apache.shardingsphere.elasticjob.lite.internal.instance;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -79,7 +79,7 @@ public final class ShutdownListenerManagerTest {
@Test
public void assertIsShutdownAlready() {
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(schedulerFacade, times(0)).shutdownInstance();
}
@@ -87,7 +87,7 @@ public final class ShutdownListenerManagerTest {
public void assertIsNotLocalInstancePath() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.2@-@0", Type.NODE_REMOVED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.2@-@0", Type.NODE_DELETED, "");
verify(schedulerFacade, times(0)).shutdownInstance();
}
@@ -95,7 +95,7 @@ public final class ShutdownListenerManagerTest {
public void assertUpdateLocalInstancePath() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_UPDATED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CHANGED, "");
verify(schedulerFacade, times(0)).shutdownInstance();
}
@@ -104,7 +104,7 @@ public final class ShutdownListenerManagerTest {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(jobScheduleController.isPaused()).thenReturn(true);
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(schedulerFacade, times(0)).shutdownInstance();
}
@@ -113,7 +113,7 @@ public final class ShutdownListenerManagerTest {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(instanceService.isLocalJobInstanceExisted()).thenReturn(true);
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(schedulerFacade, times(0)).shutdownInstance();
}
@@ -121,7 +121,7 @@ public final class ShutdownListenerManagerTest {
public void assertRemoveLocalInstancePath() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_REMOVED, "");
+ shutdownListenerManager.new InstanceShutdownStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_DELETED, "");
verify(schedulerFacade).shutdownInstance();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManagerTest.java
index db6ddac..6ff2935 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/instance/TriggerListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.instance;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
@@ -67,25 +67,25 @@ public final class TriggerListenerManagerTest {
@Test
public void assertNotTriggerWhenIsNotTriggerOperation() {
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_UPDATED, "");
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CHANGED, "");
verify(instanceService, times(0)).clearTriggerFlag();
}
@Test
public void assertNotTriggerWhenIsNotLocalInstancePath() {
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.2@-@0", Type.NODE_UPDATED, InstanceOperation.TRIGGER.name());
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.2@-@0", Type.NODE_CHANGED, InstanceOperation.TRIGGER.name());
verify(instanceService, times(0)).clearTriggerFlag();
}
@Test
public void assertNotTriggerWhenIsNotUpdate() {
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_ADDED, InstanceOperation.TRIGGER.name());
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CREATED, InstanceOperation.TRIGGER.name());
verify(instanceService, times(0)).clearTriggerFlag();
}
@Test
public void assertTriggerWhenJobScheduleControllerIsNull() {
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_UPDATED, InstanceOperation.TRIGGER.name());
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CHANGED, InstanceOperation.TRIGGER.name());
verify(instanceService).clearTriggerFlag();
verify(jobScheduleController, times(0)).triggerJob();
}
@@ -95,7 +95,7 @@ public final class TriggerListenerManagerTest {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
JobRegistry.getInstance().setJobRunning("test_job", true);
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_UPDATED, InstanceOperation.TRIGGER.name());
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CHANGED, InstanceOperation.TRIGGER.name());
verify(instanceService).clearTriggerFlag();
verify(jobScheduleController, times(0)).triggerJob();
JobRegistry.getInstance().setJobRunning("test_job", false);
@@ -106,7 +106,7 @@ public final class TriggerListenerManagerTest {
public void assertTriggerWhenJobIsNotRunning() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_UPDATED, InstanceOperation.TRIGGER.name());
+ triggerListenerManager.new JobTriggerStatusJobListener().dataChanged("/test_job/instances/127.0.0.1@-@0", Type.NODE_CHANGED, InstanceOperation.TRIGGER.name());
verify(instanceService).clearTriggerFlag();
verify(jobScheduleController).triggerJob();
JobRegistry.getInstance().shutdown("test_job");
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java
index 3987b42..0f99de0 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java
@@ -17,9 +17,8 @@
package org.apache.shardingsphere.elasticjob.lite.internal.listener;
-import org.apache.shardingsphere.elasticjob.lite.internal.listener.fixture.FooJobListener;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.shardingsphere.elasticjob.lite.internal.listener.fixture.FooJobListener;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,7 +35,7 @@ import static org.mockito.Mockito.when;
public final class JobListenerTest {
@Mock
- private TreeCacheEvent event;
+ private ChildData childData;
@Mock
private List list;
@@ -50,15 +49,16 @@ public final class JobListenerTest {
@Test
public void assertChildEventWhenEventDataIsEmpty() {
- when(event.getData()).thenReturn(null);
- fooJobListener.childEvent(null, event);
+ when(childData.getPath()).thenReturn("");
+ fooJobListener.event(null, null, childData);
verify(list, times(0)).clear();
}
@Test
public void assertChildEventSuccess() {
- when(event.getData()).thenReturn(new ChildData("/test_job", null, null));
- fooJobListener.childEvent(null, event);
+ when(childData.getPath()).thenReturn("/test");
+ when(childData.getData()).thenReturn("test".getBytes());
+ fooJobListener.event(null, null, childData);
verify(list).clear();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/fixture/FooJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/fixture/FooJobListener.java
index a8dbc5f..7318aab 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/fixture/FooJobListener.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/fixture/FooJobListener.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.elasticjob.lite.internal.listener.fixture;
import lombok.RequiredArgsConstructor;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
import java.util.List;
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManagerTest.java
index 8e9631b..18aa0d4 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/MonitorExecutionListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -49,26 +49,25 @@ public final class MonitorExecutionListenerManagerTest {
@Test
public void assertMonitorExecutionSettingsChangedJobListenerWhenIsNotFailoverPath() {
- monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/other", TreeCacheEvent.Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/other", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(executionService, times(0)).clearAllRunningInfo();
}
@Test
public void assertMonitorExecutionSettingsChangedJobListenerWhenIsFailoverPathButNotUpdate() {
- monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/config", TreeCacheEvent.Type.NODE_ADDED, "");
+ monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CREATED, "");
verify(executionService, times(0)).clearAllRunningInfo();
}
@Test
public void assertMonitorExecutionSettingsChangedJobListenerWhenIsFailoverPathAndUpdateButEnableFailover() {
- monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/config", TreeCacheEvent.Type.NODE_UPDATED, LiteYamlConstants.getJobYaml());
+ monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYaml());
verify(executionService, times(0)).clearAllRunningInfo();
}
@Test
public void assertMonitorExecutionSettingsChangedJobListenerWhenIsFailoverPathAndUpdateButDisableFailover() {
- monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged(
- "/test_job/config", TreeCacheEvent.Type.NODE_UPDATED, LiteYamlConstants.getJobYamlWithMonitorExecution(false));
+ monitorExecutionListenerManager.new MonitorExecutionSettingsChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYamlWithMonitorExecution(false));
verify(executionService).clearAllRunningInfo();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManagerTest.java
index 2caf51d..df34059 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingListenerManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractJobListener;
@@ -69,20 +69,20 @@ public final class ShardingListenerManagerTest {
@Test
public void assertShardingTotalCountChangedJobListenerWhenIsNotConfigPath() {
- shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config/other", Type.NODE_ADDED, "");
+ shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config/other", Type.NODE_CREATED, "");
verify(shardingService, times(0)).setReshardingFlag();
}
@Test
public void assertShardingTotalCountChangedJobListenerWhenIsConfigPathButCurrentShardingTotalCountIsZero() {
- shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(shardingService, times(0)).setReshardingFlag();
}
@Test
public void assertShardingTotalCountChangedJobListenerWhenIsConfigPathAndCurrentShardingTotalCountIsEqualToNewShardingTotalCount() {
JobRegistry.getInstance().setCurrentShardingTotalCount("test_job", 3);
- shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_ADDED, LiteYamlConstants.getJobYaml());
+ shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_CREATED, LiteYamlConstants.getJobYaml());
verify(shardingService, times(0)).setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount("test_job", 0);
}
@@ -90,26 +90,26 @@ public final class ShardingListenerManagerTest {
@Test
public void assertShardingTotalCountChangedJobListenerWhenIsConfigPathAndCurrentShardingTotalCountIsNotEqualToNewShardingTotalCount() {
JobRegistry.getInstance().setCurrentShardingTotalCount("test_job", 5);
- shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteYamlConstants.getJobYaml());
+ shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged("/test_job/config", Type.NODE_CHANGED, LiteYamlConstants.getJobYaml());
verify(shardingService).setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount("test_job", 0);
}
@Test
public void assertListenServersChangedJobListenerWhenIsNotServerStatusPath() {
- shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1/other", Type.NODE_ADDED, "");
+ shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1/other", Type.NODE_CREATED, "");
verify(shardingService, times(0)).setReshardingFlag();
}
@Test
public void assertListenServersChangedJobListenerWhenIsServerStatusPathButUpdate() {
- shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1/status", Type.NODE_UPDATED, "");
+ shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1/status", Type.NODE_CHANGED, "");
verify(shardingService, times(0)).setReshardingFlag();
}
@Test
public void assertListenServersChangedJobListenerWhenIsInstanceChangeButJobInstanceIsShutdown() {
- shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/instances/xxx", Type.NODE_ADDED, "");
+ shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/instances/xxx", Type.NODE_CREATED, "");
verify(shardingService, times(0)).setReshardingFlag();
}
@@ -117,7 +117,7 @@ public final class ShardingListenerManagerTest {
public void assertListenServersChangedJobListenerWhenIsInstanceChange() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/instances/xxx", Type.NODE_ADDED, "");
+ shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/instances/xxx", Type.NODE_CREATED, "");
verify(shardingService).setReshardingFlag();
JobRegistry.getInstance().shutdown("test_job");
}
@@ -126,7 +126,7 @@ public final class ShardingListenerManagerTest {
public void assertListenServersChangedJobListenerWhenIsServerChange() {
JobRegistry.getInstance().registerRegistryCenter("test_job", regCenter);
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
- shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_UPDATED, "");
+ shardingListenerManager.new ListenServersChangedJobListener().dataChanged("/test_job/servers/127.0.0.1", Type.NODE_CHANGED, "");
verify(shardingService).setReshardingFlag();
JobRegistry.getInstance().shutdown("test_job");
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index f1818fa..9e82db0 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
@@ -32,8 +32,8 @@ import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleCo
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.TransactionExecutionCallback;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -235,31 +235,32 @@ public final class ShardingServiceTest {
assertFalse(shardingService.hasShardingInfoInOfflineServers());
}
+ @SuppressWarnings("unchecked")
@Test
public void assertPersistShardingInfoTransactionExecutionCallback() throws Exception {
- CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
- TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class);
- TransactionDeleteBuilder transactionDeleteBuilder = mock(TransactionDeleteBuilder.class);
- CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class);
- when(curatorTransactionFinal.create()).thenReturn(transactionCreateBuilder);
- when(transactionCreateBuilder.forPath("/test_job/sharding/0/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge);
- when(transactionCreateBuilder.forPath("/test_job/sharding/1/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge);
- when(transactionCreateBuilder.forPath("/test_job/sharding/2/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
- when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder);
- when(transactionDeleteBuilder.forPath("/test_job/leader/sharding/necessary")).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
- when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder);
- when(transactionDeleteBuilder.forPath("/test_job/leader/sharding/processing")).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
+ TransactionOp transactionOp = mock(TransactionOp.class);
+ TransactionCreateBuilder transactionCreateBuilder0 = mock(TransactionCreateBuilder.class);
+ TransactionCreateBuilder transactionCreateBuilder1 = mock(TransactionCreateBuilder.class);
+ TransactionCreateBuilder transactionCreateBuilder2 = mock(TransactionCreateBuilder.class);
+ when(transactionOp.create()).thenReturn(transactionCreateBuilder0, transactionCreateBuilder1, transactionCreateBuilder2);
+ CuratorOp createOp0 = mock(CuratorOp.class);
+ CuratorOp createOp1 = mock(CuratorOp.class);
+ CuratorOp createOp2 = mock(CuratorOp.class);
+ when(transactionCreateBuilder0.forPath("/test_job/sharding/0/instance", "host0@-@0".getBytes())).thenReturn(createOp0);
+ when(transactionCreateBuilder1.forPath("/test_job/sharding/1/instance", "host0@-@0".getBytes())).thenReturn(createOp1);
+ when(transactionCreateBuilder2.forPath("/test_job/sharding/2/instance", "host0@-@0".getBytes())).thenReturn(createOp2);
+ TransactionDeleteBuilder transactionNecessaryDeleteBuilder = mock(TransactionDeleteBuilder.class);
+ TransactionDeleteBuilder transactionProcessingDeleteBuilder = mock(TransactionDeleteBuilder.class);
+ when(transactionOp.delete()).thenReturn(transactionNecessaryDeleteBuilder, transactionProcessingDeleteBuilder);
+ CuratorOp necessaryDeleteOp = mock(CuratorOp.class);
+ when(transactionNecessaryDeleteBuilder.forPath("/test_job/leader/sharding/necessary")).thenReturn(necessaryDeleteOp);
+ CuratorOp processingDeleteOp = mock(CuratorOp.class);
+ when(transactionProcessingDeleteBuilder.forPath("/test_job/leader/sharding/processing")).thenReturn(processingDeleteOp);
Map<JobInstance, List<Integer>> shardingResult = new HashMap<>();
shardingResult.put(new JobInstance("host0@-@0"), Arrays.asList(0, 1, 2));
ShardingService.PersistShardingInfoTransactionExecutionCallback actual = shardingService.new PersistShardingInfoTransactionExecutionCallback(shardingResult);
- actual.execute(curatorTransactionFinal);
- verify(curatorTransactionFinal, times(3)).create();
- verify(curatorTransactionFinal, times(2)).delete();
- verify(transactionDeleteBuilder).forPath("/test_job/leader/sharding/necessary");
- verify(transactionDeleteBuilder).forPath("/test_job/leader/sharding/processing");
- verify(curatorTransactionBridge, times(5)).and();
+ assertThat(actual.createCuratorOperators(transactionOp), is(Arrays.asList(createOp0, createOp1, createOp2, necessaryDeleteOp, processingDeleteOp)));
+ verify(transactionOp, times(3)).create();
+ verify(transactionOp, times(2)).delete();
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
index a12729d..f025b03 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
@@ -18,17 +18,17 @@
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,6 +36,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
+import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -159,51 +160,67 @@ public final class JobNodeStorageTest {
@Test
public void assertExecuteInTransactionSuccess() throws Exception {
CuratorFramework client = mock(CuratorFramework.class);
- CuratorTransaction curatorTransaction = mock(CuratorTransaction.class);
- TransactionCheckBuilder transactionCheckBuilder = mock(TransactionCheckBuilder.class);
- CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class);
- CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
when(regCenter.getRawClient()).thenReturn(client);
- when(client.inTransaction()).thenReturn(curatorTransaction);
- when(curatorTransaction.check()).thenReturn(transactionCheckBuilder);
- when(transactionCheckBuilder.forPath("/")).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
+ TransactionOp transactionOp = mockTransactionOp(client);
+ CuratorOp checkOp = mockCheckOp(transactionOp);
+ CuratorOp createOp = mockCreateOp(transactionOp);
+ CuratorMultiTransaction transaction = mockTransaction(client);
+ jobNodeStorage.executeInTransaction(input -> Collections.singletonList(input.create().forPath("/test_transaction")));
+ verify(transaction).forOperations(Arrays.asList(checkOp, createOp));
+ }
+
+ private TransactionOp mockTransactionOp(final CuratorFramework client) {
+ TransactionOp result = mock(TransactionOp.class);
+ when(client.transactionOp()).thenReturn(result);
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorOp mockCheckOp(final TransactionOp transactionOperation) throws Exception {
+ TransactionCheckBuilder transactionCheckBuilder = mock(TransactionCheckBuilder.class);
+ when(transactionOperation.check()).thenReturn(transactionCheckBuilder);
+ CuratorOp result = mock(CuratorOp.class);
+ when(transactionCheckBuilder.forPath("/")).thenReturn(result);
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorOp mockCreateOp(final TransactionOp transactionOperation) throws Exception {
TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class);
- when(curatorTransactionFinal.create()).thenReturn(transactionCreateBuilder);
- when(transactionCreateBuilder.forPath("/test_transaction")).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
- jobNodeStorage.executeInTransaction(curatorTransactionFinalForCallback -> curatorTransactionFinalForCallback.create().forPath("/test_transaction").and());
- verify(regCenter).getRawClient();
- verify(client).inTransaction();
- verify(curatorTransaction).check();
- verify(transactionCheckBuilder).forPath("/");
- verify(curatorTransactionBridge, times(2)).and();
- verify(curatorTransactionFinal).create();
- verify(transactionCreateBuilder).forPath("/test_transaction");
- verify(curatorTransactionFinal).commit();
+ when(transactionOperation.create()).thenReturn(transactionCreateBuilder);
+ CuratorOp result = mock(CuratorOp.class);
+ when(transactionCreateBuilder.forPath("/test_transaction")).thenReturn(result);
+ return result;
}
-
+
+ private CuratorMultiTransaction mockTransaction(final CuratorFramework client) {
+ CuratorMultiTransaction result = mock(CuratorMultiTransaction.class);
+ when(client.transaction()).thenReturn(result);
+ return result;
+ }
+
@Test(expected = RuntimeException.class)
public void assertExecuteInTransactionFailure() throws Exception {
CuratorFramework client = mock(CuratorFramework.class);
- CuratorTransaction curatorTransaction = mock(CuratorTransaction.class);
- TransactionCheckBuilder transactionCheckBuilder = mock(TransactionCheckBuilder.class);
- CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class);
- CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
when(regCenter.getRawClient()).thenReturn(client);
- when(client.inTransaction()).thenReturn(curatorTransaction);
- when(curatorTransaction.check()).thenReturn(transactionCheckBuilder);
- when(transactionCheckBuilder.forPath("/")).thenReturn(curatorTransactionBridge);
- when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
- when(curatorTransactionBridge.and()).thenThrow(new RuntimeException());
- jobNodeStorage.executeInTransaction(curatorTransactionFinalForCallback -> curatorTransactionFinalForCallback.create().forPath("/test_transaction").and());
- verify(regCenter).getRawClient();
- verify(client).inTransaction();
- verify(curatorTransaction).check();
- verify(transactionCheckBuilder).forPath("/");
- verify(curatorTransactionBridge, times(2)).and();
- verify(curatorTransactionFinal).create();
- verify(curatorTransactionFinal, times(0)).commit();
+ TransactionOp transactionOp = mockTransactionOp(client);
+ CuratorOp checkOp = mockCheckOp(transactionOp);
+ CuratorOp createFailedOp = mockCreateFailedOp(transactionOp);
+ CuratorMultiTransaction transaction = mockTransaction(client);
+ try {
+ jobNodeStorage.executeInTransaction(input -> Collections.singletonList(input.create().forPath("/test_transaction")));
+ } finally {
+ verify(transaction, times(0)).forOperations(Arrays.asList(checkOp, createFailedOp));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorOp mockCreateFailedOp(final TransactionOp transactionOperation) throws Exception {
+ TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class);
+ when(transactionOperation.create()).thenReturn(transactionCreateBuilder);
+ CuratorOp result = mock(CuratorOp.class);
+ when(transactionCreateBuilder.forPath("/test_transaction")).thenThrow(new RuntimeException());
+ return result;
}
@Test
@@ -220,12 +237,12 @@ public final class JobNodeStorageTest {
@Test
public void assertAddDataListener() {
- TreeCache treeCache = mock(TreeCache.class);
+ CuratorCache cache = mock(CuratorCache.class);
@SuppressWarnings("unchecked")
- Listenable<TreeCacheListener> listeners = mock(Listenable.class);
- TreeCacheListener listener = mock(TreeCacheListener.class);
- when(treeCache.getListenable()).thenReturn(listeners);
- when(regCenter.getRawCache("/test_job")).thenReturn(treeCache);
+ Listenable<CuratorCacheListener> listeners = mock(Listenable.class);
+ CuratorCacheListener listener = mock(CuratorCacheListener.class);
+ when(cache.listenable()).thenReturn(listeners);
+ when(regCenter.getRawCache("/test_job")).thenReturn(cache);
jobNodeStorage.addDataListener(listener);
verify(listeners).addListener(listener);
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/reg/RegistryCenterFactory.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/reg/RegistryCenterFactory.java
index 408e32e..b9c660d 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/reg/RegistryCenterFactory.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/reg/RegistryCenterFactory.java
@@ -47,7 +47,7 @@ public final class RegistryCenterFactory {
* @return registry center
*/
public static CoordinatorRegistryCenter createCoordinatorRegistryCenter(final String connectString, final String namespace, final String digest) {
- Hasher hasher = Hashing.md5().newHasher().putString(connectString, Charsets.UTF_8).putString(namespace, Charsets.UTF_8);
+ Hasher hasher = Hashing.sha256().newHasher().putString(connectString, Charsets.UTF_8).putString(namespace, Charsets.UTF_8);
if (!Strings.isNullOrEmpty(digest)) {
hasher.putString(digest, Charsets.UTF_8);
}
diff --git a/examples/pom.xml b/examples/pom.xml
index 1ea4e3b..65b9e44 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -38,7 +38,7 @@
<properties>
<java.version>1.8</java.version>
<elastic-job.version>3.0.0.M1-SNAPSHOT</elastic-job.version>
- <curator.version>2.10.0</curator.version>
+ <curator.version>5.1.0</curator.version>
<springframework.version>4.3.4.RELEASE</springframework.version>
<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.1.2</logback.version>
diff --git a/pom.xml b/pom.xml
index e92d527..feac769 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,10 +47,10 @@
<springframework.version>[3.1.0.RELEASE,)</springframework.version>
<springboot.version>2.3.1.RELEASE</springboot.version>
- <guava.version>18.0</guava.version>
+ <guava.version>29.0-jre</guava.version>
<commons-lang3.version>3.4</commons-lang3.version>
<quartz.version>2.3.2</quartz.version>
- <curator.version>2.10.0</curator.version>
+ <curator.version>5.1.0</curator.version>
<lombok.version>1.18.12</lombok.version>
<aspectj.version>1.9.1</aspectj.version>
<slf4j.version>1.7.7</slf4j.version>