You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/30 07:19:48 UTC
[shardingsphere] branch master updated: Consul repository support use localhost consul agent if not config server list (#21846)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0bc29ef52da Consul repository support use localhost consul agent if not config server list (#21846)
0bc29ef52da is described below
commit 0bc29ef52da237a2383b56d1311d82851568c27f
Author: GavinPeng <ro...@163.com>
AuthorDate: Sun Oct 30 15:19:39 2022 +0800
Consul repository support use localhost consul agent if not config server list (#21846)
* one:remove the code with lock no use
two:support use local consul agent and improves unit test performance
* modify delay commit session flush ttl time
Co-authored-by: gavin.peng <ga...@ximalaya.com>
---
.../cluster/consul/ConsulRepository.java | 31 +++++++++++--
.../consul/lock/ConsulDistributedLockHolder.java | 15 ------
.../cluster/consul/ConsulRepositoryTest.java | 53 +++++++++++++---------
3 files changed, 58 insertions(+), 41 deletions(-)
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 8c541263fa6..50ced8431be 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.repository.cluster.consul;
+import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
@@ -39,6 +40,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -46,6 +49,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ConsulRepository implements ClusterPersistRepository {
+ private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+
private ShardingSphereConsulClient consulClient;
private ConsulProperties consulProps;
@@ -56,7 +61,14 @@ public class ConsulRepository implements ClusterPersistRepository {
@Override
public void init(final ClusterPersistRepositoryConfiguration config, final InstanceMetaData instanceMetaData) {
- consulClient = new ShardingSphereConsulClient(new ConsulRawClient(config.getServerLists()));
+ ConsulRawClient rawClient;
+ String serverList = config.getServerLists();
+ if (serverList == null || serverList.length() <= 0) {
+ rawClient = new ConsulRawClient();
+ } else {
+ rawClient = new ConsulRawClient(serverList);
+ }
+ consulClient = new ShardingSphereConsulClient(rawClient);
consulProps = new ConsulProperties(config.getProps());
consulDistributedLockHolder = new ConsulDistributedLockHolder(consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1);
@@ -76,7 +88,8 @@ public class ConsulRepository implements ClusterPersistRepository {
@Override
public boolean isExisted(final String key) {
- return false;
+ Response<GetValue> response = consulClient.getKVValue(key);
+ return response.getValue() == null ? false : true;
}
@Override
@@ -86,7 +99,7 @@ public class ConsulRepository implements ClusterPersistRepository {
@Override
public void update(final String key, final String value) {
- // TODO
+ consulClient.setKVValue(key, value);
}
@Override
@@ -106,7 +119,7 @@ public class ConsulRepository implements ClusterPersistRepository {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
- consulDistributedLockHolder.generatorFlushSessionTtlTask(consulClient, sessionId);
+ generatorFlushSessionTtlTask(consulClient, sessionId);
}
private NewSession createNewSession(final String key) {
@@ -187,6 +200,16 @@ public class ConsulRepository implements ClusterPersistRepository {
listener.onChange(new DataChangedEvent(getValue.getKey(), getValue.getValue(), type));
}
+ /**
+ * Flush session by update TTL.
+ *
+ * @param consulClient consul client
+ * @param sessionId session id
+ */
+ public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
+ SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
+ }
+
@Override
public String getType() {
return "Consul";
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
index cd67c6eb4f2..3cc65df7797 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
@@ -18,18 +18,14 @@
package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.session.model.NewSession;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLockHolder;
-
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Consul distributed lock holder.
@@ -38,8 +34,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class ConsulDistributedLockHolder implements DistributedLockHolder {
- private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
-
private final Map<String, ConsulDistributedLock> locks = new ConcurrentHashMap<>();
private final ConsulClient consulClient;
@@ -69,13 +63,4 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
return null;
}
- /**
- * Flush session by update TTL.
- *
- * @param consulClient consul client
- * @param sessionId session id
- */
- public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
- SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
- }
}
diff --git a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index 7a626d7f6d1..b9a9c97f3f8 100644
--- a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++ b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -26,10 +26,10 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulDistributedLockHolder;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.exceptions.base.MockitoException;
import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.plugins.MemberAccessor;
@@ -69,16 +69,16 @@ public final class ConsulRepositoryTest {
@Mock
private Response<Boolean> responseBoolean;
- // @Mock
- // private Response<String> sessionResponse;
+ @Mock
+ private Response<String> sessionResponse;
@Mock
private GetValue getValue;
- // @Mock
- // private List<GetValue> getValueList;
- //
- // private long index = 123456L;
+ @Mock
+ private List<GetValue> getValueList;
+
+ private long index = 123456L;
@Before
public void setUp() {
@@ -90,12 +90,12 @@ public final class ConsulRepositoryTest {
private void setClient() {
when(client.getKVValue(any(String.class))).thenReturn(response);
when(response.getValue()).thenReturn(getValue);
- // when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
+ when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
when(client.getKVKeysOnly(any(String.class))).thenReturn(responseList);
- // when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
- // when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
- // when(responseGetValueList.getConsulIndex()).thenReturn(index++);
- // when(responseGetValueList.getValue()).thenReturn(getValueList);
+ when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
+ when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
+ when(responseGetValueList.getConsulIndex()).thenReturn(index++);
+ when(responseGetValueList.getValue()).thenReturn(getValueList);
when(client.setKVValue(any(String.class), any(String.class))).thenReturn(responseBoolean);
Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("consulClient"), repository, client);
Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("consulDistributedLockHolder"), repository, mock(ConsulDistributedLockHolder.class));
@@ -109,7 +109,7 @@ public final class ConsulRepositoryTest {
}
@Test
- public void assertGetKey() {
+ public void assertDirectlyKey() {
repository.getDirectly("key");
verify(client).getKVValue("key");
verify(response).getValue();
@@ -134,17 +134,13 @@ public final class ConsulRepositoryTest {
}
@Test
- @Ignore
public void assertPersistEphemeral() throws InterruptedException {
repository.persistEphemeral("key1", "value1");
verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
- Thread.sleep(6000L);
- verify(client).renewSession(any(String.class), any(QueryParams.class));
}
@Test
- @Ignore
public void assertWatchUpdate() throws InterruptedException {
final String key = "sharding/key";
final String k1 = "sharding/key/key1";
@@ -157,12 +153,18 @@ public final class ConsulRepositoryTest {
repository.watch(key, event -> {
});
client.setKVValue(k1, "value1-1");
- verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
- Thread.sleep(10000L);
+ while (true) {
+ Thread.sleep(100L);
+ try {
+ verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+ break;
+ } catch (MockitoException e) {
+ continue;
+ }
+ }
}
@Test
- @Ignore
public void assertWatchDelete() throws InterruptedException {
final String key = "sharding/key";
final String k1 = "sharding/key/key1";
@@ -178,8 +180,15 @@ public final class ConsulRepositoryTest {
repository.watch(key, event -> {
});
client.deleteKVValue(k2);
- verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
- Thread.sleep(10000L);
+ while (true) {
+ Thread.sleep(100L);
+ try {
+ verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+ break;
+ } catch (MockitoException e) {
+ continue;
+ }
+ }
}
@Test