You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/02 12:27:40 UTC

[rocketmq-clients] branch master updated: Remove redundant clientSession

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa0ee1c Remove redundant clientSession
8aa0ee1c is described below

commit 8aa0ee1c066c890edacb1a728ad8b07d61397a2f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Dec 2 20:21:57 2022 +0800

    Remove redundant clientSession
---
 .../java/org/apache/rocketmq/client/java/impl/ClientImpl.java    | 9 +++++++++
 .../org/apache/rocketmq/client/java/impl/ClientSessionImpl.java  | 1 +
 .../rocketmq/client/java/impl/producer/ClientSessionHandler.java | 6 ++++++
 3 files changed, 16 insertions(+)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 91d7ba3b..b3981b60 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -359,6 +359,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         }
     }
 
+    public void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession) {
+        sessionsLock.writeLock().lock();
+        try {
+            sessionsTable.remove(endpoints, clientSession);
+        } finally {
+            sessionsLock.writeLock().unlock();
+        }
+    }
+
     private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientException {
         sessionsLock.readLock().lock();
         try {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 2d691512..663b62ed 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -65,6 +65,7 @@ public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
             if (sessionHandler.isEndpointsDeprecated(endpoints)) {
                 log.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}, clientId={}",
                     endpoints, clientId);
+                sessionHandler.removeClientSession(endpoints, this);
                 return;
             }
             log.info("Try to renew requestObserver, endpoints={}, clientId={}", endpoints, clientId);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 4464b4af..9e68692a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.VerifyMessageCommand;
 import io.grpc.stub.StreamObserver;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
@@ -47,6 +48,11 @@ public interface ClientSessionHandler {
      */
     boolean isEndpointsDeprecated(Endpoints endpoints);
 
+    /**
+     * Remove client session.
+     */
+    void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession);
+
     /**
      * Indicates the client identifier.
      *