You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/03/17 11:17:08 UTC

[shardingsphere] branch master updated: Remove service lock for proxy & jdbc (#9709)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c861cba  Remove service lock for proxy & jdbc (#9709)
c861cba is described below

commit c861cba6ab706f4d49243fca45fa4424b5f3e690
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Wed Mar 17 19:16:21 2021 +0800

    Remove service lock for proxy & jdbc (#9709)
---
 .../governance/core/lock/GovernanceLock.java       | 35 ++++++++++++--
 .../governance/core/lock/node/LockNode.java        | 14 +++---
 .../governance/core/registry/RegistryCenter.java   | 36 ++++++++++----
 .../governance/core/lock/GovernanceLockTest.java   | 10 +++-
 .../governance/core/lock/node/LockNodeTest.java    |  4 +-
 .../core/registry/RegistryCenterTest.java          |  8 ++--
 .../infra/lock/AbstractShardingSphereLock.java     | 55 ----------------------
 .../infra/lock/ShardingSphereLock.java             | 26 ++++++----
 .../internal/state/impl/LockDriverState.java       | 14 +-----
 .../proxy/frontend/state/impl/LockProxyState.java  | 24 +---------
 10 files changed, 98 insertions(+), 128 deletions(-)

diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
index 175d75e..6ece835 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
@@ -17,18 +17,23 @@
 
 package org.apache.shardingsphere.governance.core.lock;
 
+import com.google.common.base.Joiner;
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.governance.core.event.model.lock.LockNotificationEvent;
 import org.apache.shardingsphere.governance.core.event.model.lock.LockReleasedEvent;
 import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
 import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.AbstractShardingSphereLock;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
 
 /**
  * Governance lock.
  */
-public final class GovernanceLock extends AbstractShardingSphereLock {
+public final class GovernanceLock implements ShardingSphereLock {
     
     private final RegistryCenter registryCenter;
     
@@ -39,12 +44,34 @@ public final class GovernanceLock extends AbstractShardingSphereLock {
     
     @Override
     public boolean tryLock(final String schemaName, final String tableName, final long timeoutMilliseconds) {
-        return registryCenter.tryLock(schemaName, tableName, timeoutMilliseconds);
+        boolean result = registryCenter.tryLock(timeoutMilliseconds);
+        if (result) {
+            registryCenter.addLockedResources(Arrays.asList(Joiner.on(".").join(schemaName, tableName)));
+        }
+        return result;
+    }
+    
+    @Override
+    public boolean tryLock(final String schemaName, final Collection<String> tableNames, final long timeoutMilliseconds) {
+        boolean result = registryCenter.tryLock(timeoutMilliseconds);
+        if (result) {
+            registryCenter.addLockedResources(tableNames.stream()
+                    .map(each -> Joiner.on(".").join(schemaName, each)).collect(Collectors.toList()));
+        }
+        return result;
     }
     
     @Override
     public void releaseLock(final String schemaName, final String tableName) {
-        registryCenter.releaseLock(schemaName, tableName);
+        registryCenter.releaseLock();
+        registryCenter.deleteLockedResources(Arrays.asList(Joiner.on(".").join(schemaName, tableName)));
+    }
+    
+    @Override
+    public void releaseLock(final String schemaName, final Collection<String> tableNames) {
+        registryCenter.releaseLock();
+        registryCenter.deleteLockedResources(tableNames.stream()
+                .map(each -> Joiner.on(".").join(schemaName, each)).collect(Collectors.toList()));
     }
     
     /**
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
index b62b94e..aa4d52c 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
@@ -28,6 +28,8 @@ public final class LockNode {
     
     private static final String LOCK_NODE = "glock";
     
+    private static final String LOCKED_RESOURCES_NODE = "locked_resources";
+    
     /**
      * Get lock node path.
      * 
@@ -38,13 +40,11 @@ public final class LockNode {
     }
     
     /**
-     * Get table lock node path.
-     * 
-     * @param schemaName schema name
-     * @param tableName table name
-     * @return table lock node path
+     * Get locked resources node path.
+     *
+     * @return locked resources node path
      */
-    public String getTableLockNodePath(final String schemaName, final String tableName) {
-        return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCK_NODE, schemaName, tableName);
+    public String getLockedResourcesNodePath() {
+        return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCKED_RESOURCES_NODE);
     }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 2d9c7bd..b0b8b9e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -531,22 +531,40 @@ public final class RegistryCenter {
     
     /**
      * Try to get lock.
-     * 
-     * @param schemaName schema name
-     * @param tableName table name
+     *
      * @param timeout the maximum time in milliseconds to acquire lock
      * @return true if get the lock, false if not
      */
-    public boolean tryLock(final String schemaName, final String tableName, final long timeout) {
-        return repository.tryLock(lockNode.getTableLockNodePath(schemaName, tableName), timeout, TimeUnit.MILLISECONDS);
+    public boolean tryLock(final long timeout) {
+        return repository.tryLock(lockNode.getLockNodePath(), timeout, TimeUnit.MILLISECONDS);
     }
     
     /**
      * Release lock.
-     * @param schemaName schema name
-     * @param tableName table name
      */
-    public void releaseLock(final String schemaName, final String tableName) {
-        repository.releaseLock(lockNode.getTableLockNodePath(schemaName, tableName));
+    public void releaseLock() {
+        repository.releaseLock(lockNode.getLockNodePath());
+    }
+    
+    /**
+     * Add locked resources.
+     * 
+     * @param resources collection of resources
+     */
+    public void addLockedResources(final Collection<String> resources) {
+        List<String> lockedResources = Splitter.on(",").splitToList(repository.get(lockNode.getLockedResourcesNodePath()));
+        lockedResources.addAll(resources);
+        repository.persist(lockNode.getLockedResourcesNodePath(), Joiner.on(",").join(lockedResources));
+    }
+    
+    /**
+     * Delete locked resources.
+     *
+     * @param resources collection of resources
+     */
+    public void deleteLockedResources(final Collection<String> resources) {
+        List<String> lockedResources = Splitter.on(",").splitToList(repository.get(lockNode.getLockedResourcesNodePath()));
+        lockedResources.removeAll(resources);
+        repository.persist(lockNode.getLockedResourcesNodePath(), Joiner.on(",").join(lockedResources));
     }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
index ad63cbe..2b47119 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
@@ -25,8 +25,11 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Arrays;
+
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class GovernanceLockTest {
@@ -43,13 +46,16 @@ public final class GovernanceLockTest {
     
     @Test
     public void assertTryLock() {
+        when(registryCenter.tryLock(eq(50L))).thenReturn(Boolean.TRUE);
         lock.tryLock("sharding_db", "t_order", 50L);
-        verify(registryCenter).tryLock(eq("sharding_db"), eq("t_order"), eq(50L));
+        verify(registryCenter).tryLock(eq(50L));
+        verify(registryCenter).addLockedResources(eq(Arrays.asList("sharding_db.t_order")));
     }
     
     @Test
     public void assertReleaseLock() {
         lock.releaseLock("sharding_db", "t_order");
-        verify(registryCenter).releaseLock(eq("sharding_db"), eq("t_order"));
+        verify(registryCenter).releaseLock();
+        verify(registryCenter).deleteLockedResources(eq(Arrays.asList("sharding_db.t_order")));
     }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
index 39d0a55..38d7a86 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
@@ -38,7 +38,7 @@ public final class LockNodeTest {
     }
     
     @Test
-    public void assertGetTableLockNodePath() {
-        assertThat(lockNode.getTableLockNodePath("sharding_db", "t_order"), is("/lock/glock/sharding_db/t_order"));
+    public void assertGetLockedResourceNodePath() {
+        assertThat(lockNode.getLockedResourcesNodePath(), is("/lock/locked_resources"));
     }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
index 240fd1d..ded1b48 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
@@ -161,14 +161,14 @@ public final class RegistryCenterTest {
     
     @Test
     public void assertTryLock() {
-        registryCenter.tryLock("sharding_db", "t_order", 50L);
-        verify(registryRepository).tryLock(eq(new LockNode().getTableLockNodePath("sharding_db", "t_order")), eq(50L), eq(TimeUnit.MILLISECONDS));
+        registryCenter.tryLock(50L);
+        verify(registryRepository).tryLock(eq(new LockNode().getLockNodePath()), eq(50L), eq(TimeUnit.MILLISECONDS));
     }
     
     @Test
     public void assertReleaseLock() {
-        registryCenter.releaseLock("sharding_db", "t_order");
-        verify(registryRepository).releaseLock(eq(new LockNode().getTableLockNodePath("sharding_db", "t_order")));
+        registryCenter.releaseLock();
+        verify(registryRepository).releaseLock(eq(new LockNode().getLockNodePath()));
     }
     
     @Test
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java
deleted file mode 100644
index 6d7e569..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.lock;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Abstract ShardingSphere lock.
- */
-public abstract class AbstractShardingSphereLock implements ShardingSphereLock {
-    
-    private final Lock lock = new ReentrantLock();
-    
-    private final Condition condition = lock.newCondition();
-    
-    @Override
-    public final boolean await(final Long timeoutMilliseconds) {
-        lock.lock();
-        try {
-            return condition.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
-        } catch (final InterruptedException ignored) {
-        } finally {
-            lock.unlock();
-        }
-        return false;
-    }
-    
-    @Override
-    public final void signalAll() {
-        lock.lock();
-        try {
-            condition.signalAll();
-        } finally {
-            lock.unlock();
-        }
-    }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
index d6d3044..c11fd40 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.infra.lock;
 
+import java.util.Collection;
+
 /**
  * ShardingSphere lock.
  */
@@ -33,22 +35,26 @@ public interface ShardingSphereLock {
     boolean tryLock(String schemaName, String tableName, long timeoutMilliseconds);
     
     /**
-     * Release lock.
+     * Try to lock.
+     *
      * @param schemaName schema name
-     * @param tableName table name
+     * @param tableNames collections of table names
+     * @param timeoutMilliseconds time out milliseconds to acquire lock
+     * @return true if get the lock, false if not
      */
-    void releaseLock(String schemaName, String tableName);
+    boolean tryLock(String schemaName, Collection<String> tableNames, long timeoutMilliseconds);
     
     /**
-     * Await lock.
-     * 
-     * @param timeoutMilliseconds time out milliseconds to await lock
-     * @return true if no exception
+     * Release lock.
+     * @param schemaName schema name
+     * @param tableName table name
      */
-    boolean await(Long timeoutMilliseconds);
+    void releaseLock(String schemaName, String tableName);
     
     /**
-     * Signal all.
+     * Release lock.
+     * @param schemaName schema name
+     * @param tableNames collection of table names
      */
-    void signalAll();
+    void releaseLock(String schemaName, Collection<String> tableNames);
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
index d297f6a..76a621a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
@@ -18,10 +18,7 @@
 package org.apache.shardingsphere.driver.governance.internal.state.impl;
 
 import org.apache.shardingsphere.driver.governance.internal.state.DriverState;
-import org.apache.shardingsphere.driver.governance.internal.state.DriverStateContext;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
@@ -37,14 +34,7 @@ public final class LockDriverState implements DriverState {
     @Override
     public Connection getConnection(final Map<String, DataSource> dataSourceMap, 
                                     final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final TransactionType transactionType) {
-        block(metaDataContexts);
-        return DriverStateContext.getConnection(dataSourceMap, metaDataContexts, transactionContexts, transactionType);
-    }
-    
-    private void block(final MetaDataContexts metaDataContexts) {
-        long lockTimeoutMilliseconds = metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS);
-        if (metaDataContexts.getLock().isPresent() && !metaDataContexts.getLock().get().await(lockTimeoutMilliseconds)) {
-            throw new ShardingSphereException("Service lock wait timeout of %s ms exceeded", lockTimeoutMilliseconds); 
-        }
+        // TODO
+        throw new UnsupportedOperationException("LockDriverState");
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
index b730ad1..4d2be54 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
@@ -18,17 +18,9 @@
 package org.apache.shardingsphere.proxy.frontend.state.impl;
 
 import io.netty.channel.ChannelHandlerContext;
-import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.exception.BackendException;
-import org.apache.shardingsphere.proxy.backend.exception.LockWaitTimeoutException;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
-import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
-
-import java.util.Optional;
 
 /**
  * Lock proxy state.
@@ -37,20 +29,6 @@ public final class LockProxyState implements ProxyState {
     
     @Override
     public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
-        block(context, databaseProtocolFrontendEngine);
-        ProxyStateContext.execute(context, message, databaseProtocolFrontendEngine, backendConnection);
-    }
-    
-    private void block(final ChannelHandlerContext context, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine) {
-        Long lockTimeoutMilliseconds = ProxyContext.getInstance().getMetaDataContexts().getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS);
-        if (ProxyContext.getInstance().getLock().isPresent() && !ProxyContext.getInstance().getLock().get().await(lockTimeoutMilliseconds)) {
-            doError(context, databaseProtocolFrontendEngine, new LockWaitTimeoutException(lockTimeoutMilliseconds));
-        }
-    }
-    
-    private void doError(final ChannelHandlerContext context, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendException backendException) {
-        context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(backendException));
-        Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
-        databasePacket.ifPresent(context::writeAndFlush);
+        // TODO
     }
 }