You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/04/25 08:54:11 UTC
[shardingsphere] branch master updated: Revise #10186 (#10189)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a086b39 Revise #10186 (#10189)
a086b39 is described below
commit a086b3942fc3601870d69c9312cea1787f746dff
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sun Apr 25 16:53:36 2021 +0800
Revise #10186 (#10189)
* Revise #10186
* Add count before execute and catch exception
---
.../backend/communication/jdbc/connection/BackendConnection.java | 2 +-
.../proxy/frontend/command/CommandExecutorTask.java | 3 +--
.../shardingsphere/proxy/frontend/state/impl/OKProxyState.java | 8 +++++++-
.../proxy/frontend/postgresql/PostgreSQLFrontendEngine.java | 2 +-
4 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 7d1ebad..2d983e5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -73,7 +73,7 @@ public final class BackendConnection implements ExecutorJDBCManager {
@Setter
private volatile CalciteExecutor calciteExecutor;
- private final AtomicInteger runningTaskCount = new AtomicInteger(0);
+ private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 23d8839..0af07b0 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -61,7 +61,6 @@ public final class CommandExecutorTask implements Runnable {
*/
@Override
public void run() {
- backendConnection.getRunningTaskCount().incrementAndGet();
boolean isNeedFlush = false;
try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
ConnectionStatus connectionStatus = backendConnection.getConnectionStatus();
@@ -75,7 +74,7 @@ public final class CommandExecutorTask implements Runnable {
// CHECKSTYLE:ON
processException(ex);
} finally {
- backendConnection.getRunningTaskCount().decrementAndGet();
+ backendConnection.getSubmittedTaskCount().decrementAndGet();
Collection<SQLException> exceptions = closeExecutionResources();
if (isNeedFlush) {
context.flush();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index 562b30b..2cad095 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
/**
* OK proxy state.
@@ -39,6 +40,11 @@ public final class OKProxyState implements ProxyState {
boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
ExecutorService executorService = CommandExecutorSelector.getExecutorService(
isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), context.channel().id());
- executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
+ backendConnection.getSubmittedTaskCount().incrementAndGet();
+ try {
+ executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
+ } catch (final RejectedExecutionException ignored) {
+ backendConnection.getSubmittedTaskCount().decrementAndGet();
+ }
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
index 33f6e88..2f64869 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
@@ -51,7 +51,7 @@ public final class PostgreSQLFrontendEngine implements DatabaseProtocolFrontendE
}
private void waitingForFinish(final BackendConnection backendConnection) {
- while (backendConnection.getRunningTaskCount().get() > 0) {
+ while (backendConnection.getSubmittedTaskCount().get() > 0) {
try {
Thread.sleep(500L);
} catch (final InterruptedException ex) {