You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/13 08:26:35 UTC
[rocketmq-clients] branch java_dev updated: WiP
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/java_dev by this push:
new 4ab91cf WiP
4ab91cf is described below
commit 4ab91cf00c418c6834ca9f59ac51ed72ea04f791
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 13 16:26:29 2022 +0800
WiP
---
.../client/java/impl/TelemetrySession.java | 62 +++++++++++++---------
1 file changed, 37 insertions(+), 25 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index 36d74a5..d97f84c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -26,13 +26,13 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
-import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
@@ -48,12 +48,14 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
private final ClientImpl client;
private final ClientManager clientManager;
private final Endpoints endpoints;
+ private final ReadWriteLock observerLock;
private volatile StreamObserver<TelemetryCommand> requestObserver;
private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
this.client = client;
this.clientManager = clientManager;
this.endpoints = endpoints;
+ this.observerLock = new ReentrantReadWriteLock();
}
public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
@@ -64,7 +66,7 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
private ListenableFuture<TelemetrySession> register() {
ListenableFuture<TelemetrySession> future;
try {
- this.init();
+ this.prepareObserver();
final ClientSettings clientSettings = client.getClientSettings();
final Settings settings = clientSettings.toProtobuf();
final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
@@ -72,9 +74,7 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
MoreExecutors.directExecutor());
} catch (Throwable t) {
- SettableFuture<TelemetrySession> future0 = SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
@Override
@@ -96,31 +96,41 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
/**
* Release telemetry session.
*/
- public synchronized void release() {
+ public void release() {
+ this.observerLock.writeLock().lock();
try {
if (null != requestObserver) {
- requestObserver.onCompleted();
+ try {
+ requestObserver.onCompleted();
+ // Ignore exception on purpose.
+ } catch (Throwable ignore) {
+ }
+ requestObserver = null;
}
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
+ } finally {
+ this.observerLock.writeLock().unlock();
}
}
/**
* Initialize telemetry session.
*/
- private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
- InvalidKeyException, ClientException {
- this.release();
- final Metadata metadata = client.sign();
- this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
- }
-
- private void reinit() {
+ private void prepareObserver() throws NoSuchAlgorithmException, InvalidKeyException, ClientException {
+ this.observerLock.readLock().lock();
+ try {
+ if (null != requestObserver) {
+ return;
+ }
+ } finally {
+ this.observerLock.readLock().unlock();
+ }
+ this.observerLock.writeLock().lock();
try {
- init();
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
+ final Metadata metadata = client.sign();
+ this.requestObserver = clientManager.telemetry(endpoints, metadata,
+ Duration.ofNanos(Long.MAX_VALUE), this);
+ } finally {
+ this.observerLock.readLock().unlock();
}
}
@@ -129,10 +139,12 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
*
* @param command appointed command to telemeter
*/
- public void telemeter(TelemetryCommand command) {
+ public void telemeter(TelemetryCommand command) throws NoSuchAlgorithmException, ClientException,
+ InvalidKeyException {
try {
+ this.prepareObserver();
requestObserver.onNext(command);
- } catch (RuntimeException e) {
+ } catch (Throwable e) {
// Cancel RPC.
requestObserver.onError(e);
throw e;
@@ -187,11 +199,11 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
public void onError(Throwable throwable) {
LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
client.getClientId(), endpoints, throwable);
- reinit();
+ this.release();
}
@Override
public void onCompleted() {
- reinit();
+ this.release();
}
}