You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/30 07:19:48 UTC

[shardingsphere] branch master updated: Consul repository support use localhost consul agent if not config server list (#21846)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc29ef52da Consul repository support use localhost consul agent if not config server list (#21846)
0bc29ef52da is described below

commit 0bc29ef52da237a2383b56d1311d82851568c27f
Author: GavinPeng <ro...@163.com>
AuthorDate: Sun Oct 30 15:19:39 2022 +0800

    Consul repository support use localhost consul agent if not config server list (#21846)
    
    * one:remove the code with lock no use
    two:support use local consul agent and improves unit test performance
    
    * modify delay commit session flush ttl time
    
    Co-authored-by: gavin.peng <ga...@ximalaya.com>
---
 .../cluster/consul/ConsulRepository.java           | 31 +++++++++++--
 .../consul/lock/ConsulDistributedLockHolder.java   | 15 ------
 .../cluster/consul/ConsulRepositoryTest.java       | 53 +++++++++++++---------
 3 files changed, 58 insertions(+), 41 deletions(-)

diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
index 8c541263fa6..50ced8431be 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.consul;
 
+import com.ecwid.consul.v1.ConsulClient;
 import com.ecwid.consul.v1.ConsulRawClient;
 import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.Response;
@@ -39,6 +40,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -46,6 +49,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class ConsulRepository implements ClusterPersistRepository {
     
+    private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
+    
     private ShardingSphereConsulClient consulClient;
     
     private ConsulProperties consulProps;
@@ -56,7 +61,14 @@ public class ConsulRepository implements ClusterPersistRepository {
     
     @Override
     public void init(final ClusterPersistRepositoryConfiguration config, final InstanceMetaData instanceMetaData) {
-        consulClient = new ShardingSphereConsulClient(new ConsulRawClient(config.getServerLists()));
+        ConsulRawClient rawClient;
+        String serverList = config.getServerLists();
+        if (serverList == null || serverList.length() <= 0) {
+            rawClient = new ConsulRawClient();
+        } else {
+            rawClient = new ConsulRawClient(serverList);
+        }
+        consulClient = new ShardingSphereConsulClient(rawClient);
         consulProps = new ConsulProperties(config.getProps());
         consulDistributedLockHolder = new ConsulDistributedLockHolder(consulClient, consulProps);
         watchKeyMap = new HashMap<>(6, 1);
@@ -76,7 +88,8 @@ public class ConsulRepository implements ClusterPersistRepository {
     
     @Override
     public boolean isExisted(final String key) {
-        return false;
+        Response<GetValue> response = consulClient.getKVValue(key);
+        return response.getValue() == null ? false : true;
     }
     
     @Override
@@ -86,7 +99,7 @@ public class ConsulRepository implements ClusterPersistRepository {
     
     @Override
     public void update(final String key, final String value) {
-        // TODO
+        consulClient.setKVValue(key, value);
     }
     
     @Override
@@ -106,7 +119,7 @@ public class ConsulRepository implements ClusterPersistRepository {
         PutParams putParams = new PutParams();
         putParams.setAcquireSession(sessionId);
         consulClient.setKVValue(key, value, putParams);
-        consulDistributedLockHolder.generatorFlushSessionTtlTask(consulClient, sessionId);
+        generatorFlushSessionTtlTask(consulClient, sessionId);
     }
     
     private NewSession createNewSession(final String key) {
@@ -187,6 +200,16 @@ public class ConsulRepository implements ClusterPersistRepository {
         listener.onChange(new DataChangedEvent(getValue.getKey(), getValue.getValue(), type));
     }
     
+    /**
+     * Flush session by update TTL.
+     *
+     * @param consulClient consul client
+     * @param sessionId session id
+     */
+    public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
+        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
+    }
+    
     @Override
     public String getType() {
         return "Consul";
diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
index cd67c6eb4f2..3cc65df7797 100644
--- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
+++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockHolder.java
@@ -18,18 +18,14 @@
 package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
 
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.session.model.NewSession;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLockHolder;
-
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Consul distributed lock holder.
@@ -38,8 +34,6 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public class ConsulDistributedLockHolder implements DistributedLockHolder {
     
-    private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
-    
     private final Map<String, ConsulDistributedLock> locks = new ConcurrentHashMap<>();
     
     private final ConsulClient consulClient;
@@ -69,13 +63,4 @@ public class ConsulDistributedLockHolder implements DistributedLockHolder {
         return null;
     }
     
-    /**
-     * Flush session by update TTL.
-     * 
-     * @param consulClient consul client
-     * @param sessionId session id
-     */
-    public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
-        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
-    }
 }
diff --git a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
index 7a626d7f6d1..b9a9c97f3f8 100644
--- a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++ b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
@@ -26,10 +26,10 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulDistributedLockHolder;
 import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.exceptions.base.MockitoException;
 import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.plugins.MemberAccessor;
@@ -69,16 +69,16 @@ public final class ConsulRepositoryTest {
     @Mock
     private Response<Boolean> responseBoolean;
     
-    // @Mock
-    // private Response<String> sessionResponse;
+    @Mock
+    private Response<String> sessionResponse;
     
     @Mock
     private GetValue getValue;
     
-    // @Mock
-    // private List<GetValue> getValueList;
-    //
-    // private long index = 123456L;
+    @Mock
+    private List<GetValue> getValueList;
+    
+    private long index = 123456L;
     
     @Before
     public void setUp() {
@@ -90,12 +90,12 @@ public final class ConsulRepositoryTest {
     private void setClient() {
         when(client.getKVValue(any(String.class))).thenReturn(response);
         when(response.getValue()).thenReturn(getValue);
-        // when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
+        when(client.getKVValues(any(String.class), any(QueryParams.class))).thenReturn(responseGetValueList);
         when(client.getKVKeysOnly(any(String.class))).thenReturn(responseList);
-        // when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
-        // when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
-        // when(responseGetValueList.getConsulIndex()).thenReturn(index++);
-        // when(responseGetValueList.getValue()).thenReturn(getValueList);
+        when(client.sessionCreate(any(NewSession.class), any(QueryParams.class))).thenReturn(sessionResponse);
+        when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
+        when(responseGetValueList.getConsulIndex()).thenReturn(index++);
+        when(responseGetValueList.getValue()).thenReturn(getValueList);
         when(client.setKVValue(any(String.class), any(String.class))).thenReturn(responseBoolean);
         Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("consulClient"), repository, client);
         Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("consulDistributedLockHolder"), repository, mock(ConsulDistributedLockHolder.class));
@@ -109,7 +109,7 @@ public final class ConsulRepositoryTest {
     }
     
     @Test
-    public void assertGetKey() {
+    public void assertDirectlyKey() {
         repository.getDirectly("key");
         verify(client).getKVValue("key");
         verify(response).getValue();
@@ -134,17 +134,13 @@ public final class ConsulRepositoryTest {
     }
     
     @Test
-    @Ignore
     public void assertPersistEphemeral() throws InterruptedException {
         repository.persistEphemeral("key1", "value1");
         verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
         verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
-        Thread.sleep(6000L);
-        verify(client).renewSession(any(String.class), any(QueryParams.class));
     }
     
     @Test
-    @Ignore
     public void assertWatchUpdate() throws InterruptedException {
         final String key = "sharding/key";
         final String k1 = "sharding/key/key1";
@@ -157,12 +153,18 @@ public final class ConsulRepositoryTest {
         repository.watch(key, event -> {
         });
         client.setKVValue(k1, "value1-1");
-        verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
-        Thread.sleep(10000L);
+        while (true) {
+            Thread.sleep(100L);
+            try {
+                verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+                break;
+            } catch (MockitoException e) {
+                continue;
+            }
+        }
     }
     
     @Test
-    @Ignore
     public void assertWatchDelete() throws InterruptedException {
         final String key = "sharding/key";
         final String k1 = "sharding/key/key1";
@@ -178,8 +180,15 @@ public final class ConsulRepositoryTest {
         repository.watch(key, event -> {
         });
         client.deleteKVValue(k2);
-        verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
-        Thread.sleep(10000L);
+        while (true) {
+            Thread.sleep(100L);
+            try {
+                verify(client, atLeastOnce()).getKVValues(any(String.class), any(QueryParams.class));
+                break;
+            } catch (MockitoException e) {
+                continue;
+            }
+        }
     }
     
     @Test