You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/29 06:25:12 UTC
[shardingsphere] branch master updated: Fix sonar issue of ReentrantLock await (#25927)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c993b7af059 Fix sonar issue of ReentrantLock await (#25927)
c993b7af059 is described below
commit c993b7af0597b6e26c64b0a425741f92197b4e66
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon May 29 14:25:05 2023 +0800
Fix sonar issue of ReentrantLock await (#25927)
---
.../sql/process/lock/ProcessOperationLock.java | 14 +++++---
.../process/lock/ProcessOperationLockRegistry.java | 7 +---
.../connector/SocketSinkImporterConnector.java | 23 ++++++------
.../connector/jdbc/connection/ResourceLock.java | 18 ++++++----
.../jdbc/connection/ResourceLockTest.java | 42 ++++++++++++++++++----
.../mysql/command/MySQLCommandExecuteEngine.java | 5 +--
.../command/PostgreSQLCommandExecuteEngine.java | 5 +--
.../PostgreSQLCommandExecuteEngineTest.java | 3 +-
8 files changed, 74 insertions(+), 43 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
index f8d8455f2c9..8f9cc3fb0bf 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
@@ -50,13 +50,19 @@ public final class ProcessOperationLock {
}
/**
- * Await.
- *
+ * Await default time.
+ *
+ * @param releaseStrategy release strategy
* @return boolean
*/
@SneakyThrows(InterruptedException.class)
- public boolean awaitDefaultTime() {
- return condition.await(TIMEOUT_MILLS, TimeUnit.MILLISECONDS);
+ public boolean awaitDefaultTime(final ProcessOperationLockReleaseStrategy releaseStrategy) {
+ while (!releaseStrategy.isReadyToRelease()) {
+ if (condition.await(TIMEOUT_MILLS, TimeUnit.MILLISECONDS)) {
+ return false;
+ }
+ }
+ return true;
}
/**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index db240a52630..055285b0e02 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -54,12 +54,7 @@ public final class ProcessOperationLockRegistry {
locks.put(lockId, lock);
lock.lock();
try {
- while (!releaseStrategy.isReadyToRelease()) {
- if (!lock.awaitDefaultTime()) {
- return false;
- }
- }
- return true;
+ return lock.awaitDefaultTime(releaseStrategy);
} finally {
lock.unlock();
locks.remove(lockId);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 905adc5fef8..dfbac42f191 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -128,9 +128,7 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
}
private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
- while (!channel.isWritable() && channel.isActive()) {
- doAwait();
- }
+ doAwait();
if (!channel.isActive()) {
return;
}
@@ -147,14 +145,17 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
private void doAwait() {
- lock.lock();
- try {
- condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ignored) {
- Thread.currentThread().interrupt();
- } finally {
- lock.unlock();
+ while (!channel.isWritable() && channel.isActive()) {
+ lock.lock();
+ try {
+ condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -183,7 +184,7 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
* Send incremental start event.
*
* @param socketSinkImporter socket sink importer
- * @param batchSize batch size
+ * @param batchSize batch size
*/
public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
index aa78570d1fe..2eaf8c17ffb 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.backend.connector.jdbc.connection;
+import io.netty.channel.ChannelHandlerContext;
import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;
@@ -37,15 +38,20 @@ public final class ResourceLock {
/**
* Await.
+ *
+ * @param context channel handler context
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
@SneakyThrows(InterruptedException.class)
- public void doAwait() {
- lock.lock();
- try {
- condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
- } finally {
- lock.unlock();
+ public void doAwait(final ChannelHandlerContext context) {
+ while (!context.channel().isWritable() && context.channel().isActive()) {
+ context.flush();
+ lock.lock();
+ try {
+ condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+ } finally {
+ lock.unlock();
+ }
}
}
diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
index 19398f7e1e1..4db57ec9cbb 100644
--- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
+++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
@@ -17,27 +17,57 @@
package org.apache.shardingsphere.proxy.backend.connector.jdbc.connection;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+@ExtendWith(AutoMockExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
class ResourceLockTest {
+ @Mock
+ private ChannelHandlerContext channelHandlerContext;
+
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private ResourceLock resourceLock;
+
@Test
- void assertDoAwait() {
- ResourceLock resourceLock = new ResourceLock();
- long startTime = System.currentTimeMillis();
- resourceLock.doAwait();
- assertTrue(System.currentTimeMillis() - startTime >= 200L);
+ void assertDoAwait() throws NoSuchFieldException, IllegalAccessException {
+ when(channel.isWritable()).thenReturn(false);
+ when(channel.isActive()).thenReturn(true);
+ when(channelHandlerContext.channel()).thenReturn(channel);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.submit(() -> resourceLock.doAwait(channelHandlerContext));
+ Awaitility.await().pollDelay(200L, TimeUnit.MILLISECONDS).until(() -> true);
+ Plugins.getMemberAccessor().set(ResourceLock.class.getDeclaredField("condition"), resourceLock, new ReentrantLock().newCondition());
+ verify(resourceLock, times(1)).doAwait(channelHandlerContext);
}
@Test
void assertDoNotify() {
+ when(channel.isWritable()).thenReturn(true);
+ when(channel.isActive()).thenReturn(true);
+ when(channelHandlerContext.channel()).thenReturn(channel);
ResourceLock resourceLock = new ResourceLock();
long startTime = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(1);
@@ -45,7 +75,7 @@ class ResourceLockTest {
Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(() -> true);
resourceLock.doNotify();
});
- resourceLock.doAwait();
+ resourceLock.doAwait(channelHandlerContext);
assertTrue(System.currentTimeMillis() > startTime);
}
}
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 80e6590e00e..481b30c7b55 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -78,10 +78,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
int flushThreshold = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
count++;
- while (!context.channel().isWritable() && context.channel().isActive()) {
- context.flush();
- databaseConnectionManager.getResourceLock().doAwait();
- }
+ databaseConnectionManager.getResourceLock().doAwait(context);
DatabasePacket dataValue = queryCommandExecutor.getQueryRowPacket();
context.write(dataValue);
if (flushThreshold == count) {
diff --git a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index a1f6fcbc656..4f22b96aa7e 100644
--- a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -108,10 +108,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
.getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
flushCount++;
- while (!context.channel().isWritable() && context.channel().isActive()) {
- context.flush();
- databaseConnectionManager.getResourceLock().doAwait();
- }
+ databaseConnectionManager.getResourceLock().doAwait(context);
DatabasePacket resultValue = queryCommandExecutor.getQueryRowPacket();
context.write(resultValue);
if (proxyFrontendFlushThreshold == flushCount) {
diff --git a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index c835b3d34b0..d1981c8bb18 100644
--- a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++ b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -123,10 +123,9 @@ class PostgreSQLCommandExecuteEngineTest {
ContextManager contextManager = new ContextManager(new MetaDataContexts(mock(MetaDataPersistService.class), new ShardingSphereMetaData()), mock(InstanceContext.class));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
commandExecuteEngine.writeQueryData(channelHandlerContext, databaseConnectionManager, queryCommandExecutor, 0);
- verify(resourceLock).doAwait();
+ verify(resourceLock).doAwait(channelHandlerContext);
verify(channelHandlerContext).write(packet);
verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
- verify(channelHandlerContext).flush();
verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
}
}