You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/13 08:26:35 UTC

[rocketmq-clients] branch java_dev updated: WiP

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/java_dev by this push:
     new 4ab91cf  WiP
4ab91cf is described below

commit 4ab91cf00c418c6834ca9f59ac51ed72ea04f791
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 13 16:26:29 2022 +0800

    WiP
---
 .../client/java/impl/TelemetrySession.java         | 62 +++++++++++++---------
 1 file changed, 37 insertions(+), 25 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index 36d74a5..d97f84c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -26,13 +26,13 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
-import java.io.UnsupportedEncodingException;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
@@ -48,12 +48,14 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     private final ClientImpl client;
     private final ClientManager clientManager;
     private final Endpoints endpoints;
+    private final ReadWriteLock observerLock;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
     private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
         this.client = client;
         this.clientManager = clientManager;
         this.endpoints = endpoints;
+        this.observerLock = new ReentrantReadWriteLock();
     }
 
     public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
@@ -64,7 +66,7 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     private ListenableFuture<TelemetrySession> register() {
         ListenableFuture<TelemetrySession> future;
         try {
-            this.init();
+            this.prepareObserver();
             final ClientSettings clientSettings = client.getClientSettings();
             final Settings settings = clientSettings.toProtobuf();
             final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
@@ -72,9 +74,7 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
             future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
                 MoreExecutors.directExecutor());
         } catch (Throwable t) {
-            SettableFuture<TelemetrySession> future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
         Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
             @Override
@@ -96,31 +96,41 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     /**
      * Release telemetry session.
      */
-    public synchronized void release() {
+    public void release() {
+        this.observerLock.writeLock().lock();
         try {
             if (null != requestObserver) {
-                requestObserver.onCompleted();
+                try {
+                    requestObserver.onCompleted();
+                    // Ignore exception on purpose.
+                } catch (Throwable ignore) {
+                }
+                requestObserver = null;
             }
-        } catch (Throwable ignore) {
-            // Ignore exception on purpose.
+        } finally {
+            this.observerLock.writeLock().unlock();
         }
     }
 
     /**
      * Initialize telemetry session.
      */
-    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
-        InvalidKeyException, ClientException {
-        this.release();
-        final Metadata metadata = client.sign();
-        this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
-    }
-
-    private void reinit() {
+    private void prepareObserver() throws NoSuchAlgorithmException, InvalidKeyException, ClientException {
+        this.observerLock.readLock().lock();
+        try {
+            if (null != requestObserver) {
+                return;
+            }
+        } finally {
+            this.observerLock.readLock().unlock();
+        }
+        this.observerLock.writeLock().lock();
         try {
-            init();
-        } catch (Throwable ignore) {
-            // Ignore exception on purpose.
+            final Metadata metadata = client.sign();
+            this.requestObserver = clientManager.telemetry(endpoints, metadata,
+                Duration.ofNanos(Long.MAX_VALUE), this);
+        } finally {
+            this.observerLock.readLock().unlock();
         }
     }
 
@@ -129,10 +139,12 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
      *
      * @param command appointed command to telemeter
      */
-    public void telemeter(TelemetryCommand command) {
+    public void telemeter(TelemetryCommand command) throws NoSuchAlgorithmException, ClientException,
+        InvalidKeyException {
         try {
+            this.prepareObserver();
             requestObserver.onNext(command);
-        } catch (RuntimeException e) {
+        } catch (Throwable e) {
             // Cancel RPC.
             requestObserver.onError(e);
             throw e;
@@ -187,11 +199,11 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     public void onError(Throwable throwable) {
         LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
             client.getClientId(), endpoints, throwable);
-        reinit();
+        this.release();
     }
 
     @Override
     public void onCompleted() {
-        reinit();
+        this.release();
     }
 }