You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/03/17 11:17:08 UTC
[shardingsphere] branch master updated: Remove service lock for
proxy & jdbc (#9709)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c861cba Remove service lock for proxy & jdbc (#9709)
c861cba is described below
commit c861cba6ab706f4d49243fca45fa4424b5f3e690
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Wed Mar 17 19:16:21 2021 +0800
Remove service lock for proxy & jdbc (#9709)
---
.../governance/core/lock/GovernanceLock.java | 35 ++++++++++++--
.../governance/core/lock/node/LockNode.java | 14 +++---
.../governance/core/registry/RegistryCenter.java | 36 ++++++++++----
.../governance/core/lock/GovernanceLockTest.java | 10 +++-
.../governance/core/lock/node/LockNodeTest.java | 4 +-
.../core/registry/RegistryCenterTest.java | 8 ++--
.../infra/lock/AbstractShardingSphereLock.java | 55 ----------------------
.../infra/lock/ShardingSphereLock.java | 26 ++++++----
.../internal/state/impl/LockDriverState.java | 14 +-----
.../proxy/frontend/state/impl/LockProxyState.java | 24 +---------
10 files changed, 98 insertions(+), 128 deletions(-)
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
index 175d75e..6ece835 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLock.java
@@ -17,18 +17,23 @@
package org.apache.shardingsphere.governance.core.lock;
+import com.google.common.base.Joiner;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.event.model.lock.LockNotificationEvent;
import org.apache.shardingsphere.governance.core.event.model.lock.LockReleasedEvent;
import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.AbstractShardingSphereLock;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Governance lock.
*/
-public final class GovernanceLock extends AbstractShardingSphereLock {
+public final class GovernanceLock implements ShardingSphereLock {
private final RegistryCenter registryCenter;
@@ -39,12 +44,34 @@ public final class GovernanceLock extends AbstractShardingSphereLock {
@Override
public boolean tryLock(final String schemaName, final String tableName, final long timeoutMilliseconds) {
- return registryCenter.tryLock(schemaName, tableName, timeoutMilliseconds);
+ boolean result = registryCenter.tryLock(timeoutMilliseconds);
+ if (result) {
+ registryCenter.addLockedResources(Arrays.asList(Joiner.on(".").join(schemaName, tableName)));
+ }
+ return result;
+ }
+
+ @Override
+ public boolean tryLock(final String schemaName, final Collection<String> tableNames, final long timeoutMilliseconds) {
+ boolean result = registryCenter.tryLock(timeoutMilliseconds);
+ if (result) {
+ registryCenter.addLockedResources(tableNames.stream()
+ .map(each -> Joiner.on(".").join(schemaName, each)).collect(Collectors.toList()));
+ }
+ return result;
}
@Override
public void releaseLock(final String schemaName, final String tableName) {
- registryCenter.releaseLock(schemaName, tableName);
+ registryCenter.releaseLock();
+ registryCenter.deleteLockedResources(Arrays.asList(Joiner.on(".").join(schemaName, tableName)));
+ }
+
+ @Override
+ public void releaseLock(final String schemaName, final Collection<String> tableNames) {
+ registryCenter.releaseLock();
+ registryCenter.deleteLockedResources(tableNames.stream()
+ .map(each -> Joiner.on(".").join(schemaName, each)).collect(Collectors.toList()));
}
/**
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
index b62b94e..aa4d52c 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
@@ -28,6 +28,8 @@ public final class LockNode {
private static final String LOCK_NODE = "glock";
+ private static final String LOCKED_RESOURCES_NODE = "locked_resources";
+
/**
* Get lock node path.
*
@@ -38,13 +40,11 @@ public final class LockNode {
}
/**
- * Get table lock node path.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @return table lock node path
+ * Get locked resources node path.
+ *
+ * @return locked resources node path
*/
- public String getTableLockNodePath(final String schemaName, final String tableName) {
- return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCK_NODE, schemaName, tableName);
+ public String getLockedResourcesNodePath() {
+ return Joiner.on("/").join("", LOCK_NODE_ROOT, LOCKED_RESOURCES_NODE);
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 2d9c7bd..b0b8b9e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -531,22 +531,40 @@ public final class RegistryCenter {
/**
* Try to get lock.
- *
- * @param schemaName schema name
- * @param tableName table name
+ *
* @param timeout the maximum time in milliseconds to acquire lock
* @return true if get the lock, false if not
*/
- public boolean tryLock(final String schemaName, final String tableName, final long timeout) {
- return repository.tryLock(lockNode.getTableLockNodePath(schemaName, tableName), timeout, TimeUnit.MILLISECONDS);
+ public boolean tryLock(final long timeout) {
+ return repository.tryLock(lockNode.getLockNodePath(), timeout, TimeUnit.MILLISECONDS);
}
/**
* Release lock.
- * @param schemaName schema name
- * @param tableName table name
*/
- public void releaseLock(final String schemaName, final String tableName) {
- repository.releaseLock(lockNode.getTableLockNodePath(schemaName, tableName));
+ public void releaseLock() {
+ repository.releaseLock(lockNode.getLockNodePath());
+ }
+
+ /**
+ * Add locked resources.
+ *
+ * @param resources collection of resources
+ */
+ public void addLockedResources(final Collection<String> resources) {
+ List<String> lockedResources = Splitter.on(",").splitToList(repository.get(lockNode.getLockedResourcesNodePath()));
+ lockedResources.addAll(resources);
+ repository.persist(lockNode.getLockedResourcesNodePath(), Joiner.on(",").join(lockedResources));
+ }
+
+ /**
+ * Delete locked resources.
+ *
+ * @param resources collection of resources
+ */
+ public void deleteLockedResources(final Collection<String> resources) {
+ List<String> lockedResources = Splitter.on(",").splitToList(repository.get(lockNode.getLockedResourcesNodePath()));
+ lockedResources.removeAll(resources);
+ repository.persist(lockNode.getLockedResourcesNodePath(), Joiner.on(",").join(lockedResources));
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
index ad63cbe..2b47119 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockTest.java
@@ -25,8 +25,11 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Arrays;
+
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class GovernanceLockTest {
@@ -43,13 +46,16 @@ public final class GovernanceLockTest {
@Test
public void assertTryLock() {
+ when(registryCenter.tryLock(eq(50L))).thenReturn(Boolean.TRUE);
lock.tryLock("sharding_db", "t_order", 50L);
- verify(registryCenter).tryLock(eq("sharding_db"), eq("t_order"), eq(50L));
+ verify(registryCenter).tryLock(eq(50L));
+ verify(registryCenter).addLockedResources(eq(Arrays.asList("sharding_db.t_order")));
}
@Test
public void assertReleaseLock() {
lock.releaseLock("sharding_db", "t_order");
- verify(registryCenter).releaseLock(eq("sharding_db"), eq("t_order"));
+ verify(registryCenter).releaseLock();
+ verify(registryCenter).deleteLockedResources(eq(Arrays.asList("sharding_db.t_order")));
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
index 39d0a55..38d7a86 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
@@ -38,7 +38,7 @@ public final class LockNodeTest {
}
@Test
- public void assertGetTableLockNodePath() {
- assertThat(lockNode.getTableLockNodePath("sharding_db", "t_order"), is("/lock/glock/sharding_db/t_order"));
+ public void assertGetLockedResourceNodePath() {
+ assertThat(lockNode.getLockedResourcesNodePath(), is("/lock/locked_resources"));
}
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
index 240fd1d..ded1b48 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
@@ -161,14 +161,14 @@ public final class RegistryCenterTest {
@Test
public void assertTryLock() {
- registryCenter.tryLock("sharding_db", "t_order", 50L);
- verify(registryRepository).tryLock(eq(new LockNode().getTableLockNodePath("sharding_db", "t_order")), eq(50L), eq(TimeUnit.MILLISECONDS));
+ registryCenter.tryLock(50L);
+ verify(registryRepository).tryLock(eq(new LockNode().getLockNodePath()), eq(50L), eq(TimeUnit.MILLISECONDS));
}
@Test
public void assertReleaseLock() {
- registryCenter.releaseLock("sharding_db", "t_order");
- verify(registryRepository).releaseLock(eq(new LockNode().getTableLockNodePath("sharding_db", "t_order")));
+ registryCenter.releaseLock();
+ verify(registryRepository).releaseLock(eq(new LockNode().getLockNodePath()));
}
@Test
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java
deleted file mode 100644
index 6d7e569..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/AbstractShardingSphereLock.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.lock;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Abstract ShardingSphere lock.
- */
-public abstract class AbstractShardingSphereLock implements ShardingSphereLock {
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition condition = lock.newCondition();
-
- @Override
- public final boolean await(final Long timeoutMilliseconds) {
- lock.lock();
- try {
- return condition.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ignored) {
- } finally {
- lock.unlock();
- }
- return false;
- }
-
- @Override
- public final void signalAll() {
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
- }
- }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
index d6d3044..c11fd40 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/ShardingSphereLock.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.infra.lock;
+import java.util.Collection;
+
/**
* ShardingSphere lock.
*/
@@ -33,22 +35,26 @@ public interface ShardingSphereLock {
boolean tryLock(String schemaName, String tableName, long timeoutMilliseconds);
/**
- * Release lock.
+ * Try to lock.
+ *
* @param schemaName schema name
- * @param tableName table name
+ * @param tableNames collections of table names
+ * @param timeoutMilliseconds time out milliseconds to acquire lock
+ * @return true if get the lock, false if not
*/
- void releaseLock(String schemaName, String tableName);
+ boolean tryLock(String schemaName, Collection<String> tableNames, long timeoutMilliseconds);
/**
- * Await lock.
- *
- * @param timeoutMilliseconds time out milliseconds to await lock
- * @return true if no exception
+ * Release lock.
+ * @param schemaName schema name
+ * @param tableName table name
*/
- boolean await(Long timeoutMilliseconds);
+ void releaseLock(String schemaName, String tableName);
/**
- * Signal all.
+ * Release lock.
+ * @param schemaName schema name
+ * @param tableNames collection of table names
*/
- void signalAll();
+ void releaseLock(String schemaName, Collection<String> tableNames);
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
index d297f6a..76a621a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/state/impl/LockDriverState.java
@@ -18,10 +18,7 @@
package org.apache.shardingsphere.driver.governance.internal.state.impl;
import org.apache.shardingsphere.driver.governance.internal.state.DriverState;
-import org.apache.shardingsphere.driver.governance.internal.state.DriverStateContext;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -37,14 +34,7 @@ public final class LockDriverState implements DriverState {
@Override
public Connection getConnection(final Map<String, DataSource> dataSourceMap,
final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final TransactionType transactionType) {
- block(metaDataContexts);
- return DriverStateContext.getConnection(dataSourceMap, metaDataContexts, transactionContexts, transactionType);
- }
-
- private void block(final MetaDataContexts metaDataContexts) {
- long lockTimeoutMilliseconds = metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS);
- if (metaDataContexts.getLock().isPresent() && !metaDataContexts.getLock().get().await(lockTimeoutMilliseconds)) {
- throw new ShardingSphereException("Service lock wait timeout of %s ms exceeded", lockTimeoutMilliseconds);
- }
+ // TODO
+ throw new UnsupportedOperationException("LockDriverState");
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
index b730ad1..4d2be54 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
@@ -18,17 +18,9 @@
package org.apache.shardingsphere.proxy.frontend.state.impl;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.exception.BackendException;
-import org.apache.shardingsphere.proxy.backend.exception.LockWaitTimeoutException;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
-import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
-
-import java.util.Optional;
/**
* Lock proxy state.
@@ -37,20 +29,6 @@ public final class LockProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
- block(context, databaseProtocolFrontendEngine);
- ProxyStateContext.execute(context, message, databaseProtocolFrontendEngine, backendConnection);
- }
-
- private void block(final ChannelHandlerContext context, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine) {
- Long lockTimeoutMilliseconds = ProxyContext.getInstance().getMetaDataContexts().getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS);
- if (ProxyContext.getInstance().getLock().isPresent() && !ProxyContext.getInstance().getLock().get().await(lockTimeoutMilliseconds)) {
- doError(context, databaseProtocolFrontendEngine, new LockWaitTimeoutException(lockTimeoutMilliseconds));
- }
- }
-
- private void doError(final ChannelHandlerContext context, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendException backendException) {
- context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(backendException));
- Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
- databasePacket.ifPresent(context::writeAndFlush);
+ // TODO
}
}