You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/29 06:25:12 UTC

[shardingsphere] branch master updated: Fix sonar issue of ReentrantLock await (#25927)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c993b7af059 Fix sonar issue of ReentrantLock await (#25927)
c993b7af059 is described below

commit c993b7af0597b6e26c64b0a425741f92197b4e66
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon May 29 14:25:05 2023 +0800

    Fix sonar issue of ReentrantLock await (#25927)
---
 .../sql/process/lock/ProcessOperationLock.java     | 14 +++++---
 .../process/lock/ProcessOperationLockRegistry.java |  7 +---
 .../connector/SocketSinkImporterConnector.java     | 23 ++++++------
 .../connector/jdbc/connection/ResourceLock.java    | 18 ++++++----
 .../jdbc/connection/ResourceLockTest.java          | 42 ++++++++++++++++++----
 .../mysql/command/MySQLCommandExecuteEngine.java   |  5 +--
 .../command/PostgreSQLCommandExecuteEngine.java    |  5 +--
 .../PostgreSQLCommandExecuteEngineTest.java        |  3 +-
 8 files changed, 74 insertions(+), 43 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
index f8d8455f2c9..8f9cc3fb0bf 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
@@ -50,13 +50,19 @@ public final class ProcessOperationLock {
     }
     
     /**
-     * Await.
-     * 
+     * Await default time.
+     *
+     * @param releaseStrategy release strategy
      * @return boolean
      */
     @SneakyThrows(InterruptedException.class)
-    public boolean awaitDefaultTime() {
-        return condition.await(TIMEOUT_MILLS, TimeUnit.MILLISECONDS);
+    public boolean awaitDefaultTime(final ProcessOperationLockReleaseStrategy releaseStrategy) {
+        while (!releaseStrategy.isReadyToRelease()) {
+            if (condition.await(TIMEOUT_MILLS, TimeUnit.MILLISECONDS)) {
+                return false;
+            }
+        }
+        return true;
     }
     
     /**
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index db240a52630..055285b0e02 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -54,12 +54,7 @@ public final class ProcessOperationLockRegistry {
         locks.put(lockId, lock);
         lock.lock();
         try {
-            while (!releaseStrategy.isReadyToRelease()) {
-                if (!lock.awaitDefaultTime()) {
-                    return false;
-                }
-            }
-            return true;
+            return lock.awaitDefaultTime(releaseStrategy);
         } finally {
             lock.unlock();
             locks.remove(lockId);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 905adc5fef8..dfbac42f191 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -128,9 +128,7 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
     }
     
     private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
-        while (!channel.isWritable() && channel.isActive()) {
-            doAwait();
-        }
+        doAwait();
         if (!channel.isActive()) {
             return;
         }
@@ -147,14 +145,17 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
         channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
     }
     
+    @SuppressWarnings("ResultOfMethodCallIgnored")
     private void doAwait() {
-        lock.lock();
-        try {
-            condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
-        } catch (final InterruptedException ignored) {
-            Thread.currentThread().interrupt();
-        } finally {
-            lock.unlock();
+        while (!channel.isWritable() && channel.isActive()) {
+            lock.lock();
+            try {
+                condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+            } catch (final InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+            } finally {
+                lock.unlock();
+            }
         }
     }
     
@@ -183,7 +184,7 @@ public final class SocketSinkImporterConnector implements ImporterConnector, Aut
      * Send incremental start event.
      *
      * @param socketSinkImporter socket sink importer
-     * @param batchSize batch size
+     * @param batchSize          batch size
      */
     public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
         incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
index aa78570d1fe..2eaf8c17ffb 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLock.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.connector.jdbc.connection;
 
+import io.netty.channel.ChannelHandlerContext;
 import lombok.SneakyThrows;
 
 import java.util.concurrent.TimeUnit;
@@ -37,15 +38,20 @@ public final class ResourceLock {
     
     /**
      * Await.
+     *
+     * @param context channel handler context
      */
     @SuppressWarnings("ResultOfMethodCallIgnored")
     @SneakyThrows(InterruptedException.class)
-    public void doAwait() {
-        lock.lock();
-        try {
-            condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
-        } finally {
-            lock.unlock();
+    public void doAwait(final ChannelHandlerContext context) {
+        while (!context.channel().isWritable() && context.channel().isActive()) {
+            context.flush();
+            lock.lock();
+            try {
+                condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+            } finally {
+                lock.unlock();
+            }
         }
     }
     
diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
index 19398f7e1e1..4db57ec9cbb 100644
--- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
+++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/connection/ResourceLockTest.java
@@ -17,27 +17,57 @@
 
 package org.apache.shardingsphere.proxy.backend.connector.jdbc.connection;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
+@ExtendWith(AutoMockExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
 class ResourceLockTest {
     
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+    
+    @Mock
+    private Channel channel;
+    
+    @Mock
+    private ResourceLock resourceLock;
+    
     @Test
-    void assertDoAwait() {
-        ResourceLock resourceLock = new ResourceLock();
-        long startTime = System.currentTimeMillis();
-        resourceLock.doAwait();
-        assertTrue(System.currentTimeMillis() - startTime >= 200L);
+    void assertDoAwait() throws NoSuchFieldException, IllegalAccessException {
+        when(channel.isWritable()).thenReturn(false);
+        when(channel.isActive()).thenReturn(true);
+        when(channelHandlerContext.channel()).thenReturn(channel);
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        executorService.submit(() -> resourceLock.doAwait(channelHandlerContext));
+        Awaitility.await().pollDelay(200L, TimeUnit.MILLISECONDS).until(() -> true);
+        Plugins.getMemberAccessor().set(ResourceLock.class.getDeclaredField("condition"), resourceLock, new ReentrantLock().newCondition());
+        verify(resourceLock, times(1)).doAwait(channelHandlerContext);
     }
     
     @Test
     void assertDoNotify() {
+        when(channel.isWritable()).thenReturn(true);
+        when(channel.isActive()).thenReturn(true);
+        when(channelHandlerContext.channel()).thenReturn(channel);
         ResourceLock resourceLock = new ResourceLock();
         long startTime = System.currentTimeMillis();
         ExecutorService executorService = Executors.newFixedThreadPool(1);
@@ -45,7 +75,7 @@ class ResourceLockTest {
             Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(() -> true);
             resourceLock.doNotify();
         });
-        resourceLock.doAwait();
+        resourceLock.doAwait(channelHandlerContext);
         assertTrue(System.currentTimeMillis() > startTime);
     }
 }
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 80e6590e00e..481b30c7b55 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -78,10 +78,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
         int flushThreshold = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
         while (queryCommandExecutor.next()) {
             count++;
-            while (!context.channel().isWritable() && context.channel().isActive()) {
-                context.flush();
-                databaseConnectionManager.getResourceLock().doAwait();
-            }
+            databaseConnectionManager.getResourceLock().doAwait(context);
             DatabasePacket dataValue = queryCommandExecutor.getQueryRowPacket();
             context.write(dataValue);
             if (flushThreshold == count) {
diff --git a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index a1f6fcbc656..4f22b96aa7e 100644
--- a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -108,10 +108,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
                 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
         while (queryCommandExecutor.next()) {
             flushCount++;
-            while (!context.channel().isWritable() && context.channel().isActive()) {
-                context.flush();
-                databaseConnectionManager.getResourceLock().doAwait();
-            }
+            databaseConnectionManager.getResourceLock().doAwait(context);
             DatabasePacket resultValue = queryCommandExecutor.getQueryRowPacket();
             context.write(resultValue);
             if (proxyFrontendFlushThreshold == flushCount) {
diff --git a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index c835b3d34b0..d1981c8bb18 100644
--- a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++ b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -123,10 +123,9 @@ class PostgreSQLCommandExecuteEngineTest {
         ContextManager contextManager = new ContextManager(new MetaDataContexts(mock(MetaDataPersistService.class), new ShardingSphereMetaData()), mock(InstanceContext.class));
         when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
         commandExecuteEngine.writeQueryData(channelHandlerContext, databaseConnectionManager, queryCommandExecutor, 0);
-        verify(resourceLock).doAwait();
+        verify(resourceLock).doAwait(channelHandlerContext);
         verify(channelHandlerContext).write(packet);
         verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
-        verify(channelHandlerContext).flush();
         verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
     }
 }