You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/04/25 08:54:11 UTC

[shardingsphere] branch master updated: Revise #10186 (#10189)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a086b39  Revise #10186 (#10189)
a086b39 is described below

commit a086b3942fc3601870d69c9312cea1787f746dff
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sun Apr 25 16:53:36 2021 +0800

    Revise #10186 (#10189)
    
    * Revise #10186
    
    * Add count before execute and catch exception
---
 .../backend/communication/jdbc/connection/BackendConnection.java  | 2 +-
 .../proxy/frontend/command/CommandExecutorTask.java               | 3 +--
 .../shardingsphere/proxy/frontend/state/impl/OKProxyState.java    | 8 +++++++-
 .../proxy/frontend/postgresql/PostgreSQLFrontendEngine.java       | 2 +-
 4 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 7d1ebad..2d983e5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -73,7 +73,7 @@ public final class BackendConnection implements ExecutorJDBCManager {
     @Setter
     private volatile CalciteExecutor calciteExecutor;
     
-    private final AtomicInteger runningTaskCount = new AtomicInteger(0);
+    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
     
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 23d8839..0af07b0 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -61,7 +61,6 @@ public final class CommandExecutorTask implements Runnable {
      */
     @Override
     public void run() {
-        backendConnection.getRunningTaskCount().incrementAndGet();
         boolean isNeedFlush = false;
         try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
             ConnectionStatus connectionStatus = backendConnection.getConnectionStatus();
@@ -75,7 +74,7 @@ public final class CommandExecutorTask implements Runnable {
             // CHECKSTYLE:ON
             processException(ex);
         } finally {
-            backendConnection.getRunningTaskCount().decrementAndGet();
+            backendConnection.getSubmittedTaskCount().decrementAndGet();
             Collection<SQLException> exceptions = closeExecutionResources();
             if (isNeedFlush) {
                 context.flush();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index 562b30b..2cad095 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
 import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * OK proxy state.
@@ -39,6 +40,11 @@ public final class OKProxyState implements ProxyState {
         boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
         ExecutorService executorService = CommandExecutorSelector.getExecutorService(
                 isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), context.channel().id());
-        executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
+        backendConnection.getSubmittedTaskCount().incrementAndGet();
+        try {
+            executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
+        } catch (final RejectedExecutionException ignored) {
+            backendConnection.getSubmittedTaskCount().decrementAndGet();
+        }
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
index 33f6e88..2f64869 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
@@ -51,7 +51,7 @@ public final class PostgreSQLFrontendEngine implements DatabaseProtocolFrontendE
     }
     
     private void waitingForFinish(final BackendConnection backendConnection) {
-        while (backendConnection.getRunningTaskCount().get() > 0) {
+        while (backendConnection.getSubmittedTaskCount().get() > 0) {
             try {
                 Thread.sleep(500L);
             } catch (final InterruptedException ex) {