You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/02 12:27:40 UTC
[rocketmq-clients] branch master updated: Remove redundant clientSession
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8aa0ee1c Remove redundant clientSession
8aa0ee1c is described below
commit 8aa0ee1c066c890edacb1a728ad8b07d61397a2f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Dec 2 20:21:57 2022 +0800
Remove redundant clientSession
---
.../java/org/apache/rocketmq/client/java/impl/ClientImpl.java | 9 +++++++++
.../org/apache/rocketmq/client/java/impl/ClientSessionImpl.java | 1 +
.../rocketmq/client/java/impl/producer/ClientSessionHandler.java | 6 ++++++
3 files changed, 16 insertions(+)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 91d7ba3b..b3981b60 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -359,6 +359,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
}
+ public void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession) {
+ sessionsLock.writeLock().lock();
+ try {
+ sessionsTable.remove(endpoints, clientSession);
+ } finally {
+ sessionsLock.writeLock().unlock();
+ }
+ }
+
private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientException {
sessionsLock.readLock().lock();
try {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 2d691512..663b62ed 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -65,6 +65,7 @@ public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
if (sessionHandler.isEndpointsDeprecated(endpoints)) {
log.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}, clientId={}",
endpoints, clientId);
+ sessionHandler.removeClientSession(endpoints, this);
return;
}
log.info("Try to renew requestObserver, endpoints={}, clientId={}", endpoints, clientId);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 4464b4af..9e68692a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.VerifyMessageCommand;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -47,6 +48,11 @@ public interface ClientSessionHandler {
*/
boolean isEndpointsDeprecated(Endpoints endpoints);
+ /**
+ * Remove client session.
+ */
+ void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession);
+
/**
* Indicates the client identifier.
*