You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/13 11:36:06 UTC
[rocketmq-clients] 01/01: Refactor client telemetry
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 9c0e2fb438eba3fbad4197127ae4d25bbe1bd973
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 13 17:08:55 2022 +0800
Refactor client telemetry
---
.../java/exception/InternalErrorException.java | 4 +
.../apache/rocketmq/client/java/impl/Client.java | 6 +-
.../rocketmq/client/java/impl/ClientImpl.java | 101 ++++++++++-----
.../client/java/impl/ClientManagerImpl.java | 8 +-
.../client/java/impl/ClientManagerRegistry.java | 4 +-
...elemetrySession.java => ClientSessionImpl.java} | 138 +++++++++------------
.../java/impl/consumer/ProcessQueueImpl.java | 36 +++---
.../java/impl/producer/ClientSessionProcessor.java | 47 +++++++
.../client/java/metrics/ClientMeterProvider.java | 2 +-
.../java/metrics/MessageMeterInterceptor.java | 14 +--
.../java/impl/consumer/PushConsumerImplTest.java | 4 +-
.../java/impl/consumer/SimpleConsumerImplTest.java | 4 +-
.../java/impl/producer/ProducerImplTest.java | 12 +-
13 files changed, 225 insertions(+), 155 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index e1a44a4..f42d369 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -27,4 +27,8 @@ public class InternalErrorException extends ClientException {
public InternalErrorException(int responseCode, String message) {
super(responseCode, message);
}
+
+ public InternalErrorException(Throwable cause) {
+ super(cause);
+ }
}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index b106db3..b0ac5f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -25,7 +25,7 @@ public interface Client {
*
* @return unique client identifier.
*/
- String getClientId();
+ String clientId();
/**
* Send heart beat to remote {@link Endpoints}.
@@ -33,9 +33,9 @@ public interface Client {
void doHeartbeat();
/**
- * Voluntary announce settings to remote.
+ * Sync settings to remote.
*/
- void telemeterSettings();
+ void syncSettings();
/**
* Do some stats for client.
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 3cf8a36..99b292d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -42,11 +42,13 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
+import io.grpc.stub.StreamObserver;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -67,10 +69,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
import org.apache.rocketmq.client.java.metrics.Metric;
@@ -86,7 +90,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor {
+public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor,
+ MessageInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class);
private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3);
@@ -111,9 +116,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable;
private final Lock inflightRouteFutureLock;
- @GuardedBy("telemetrySessionsLock")
- private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable;
- private final ReadWriteLock telemetrySessionsLock;
+ @GuardedBy("endpointsSessionsLock")
+ private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable;
+ private final ReadWriteLock endpointsSessionsLock;
@GuardedBy("messageInterceptorsLock")
private final List<MessageInterceptor> messageInterceptors;
@@ -131,8 +136,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
this.inflightRouteFutureTable = new ConcurrentHashMap<>();
this.inflightRouteFutureLock = new ReentrantLock();
- this.telemetrySessionTable = new ConcurrentHashMap<>();
- this.telemetrySessionsLock = new ReentrantReadWriteLock();
+ this.endpointsSessionTable = new HashMap<>();
+ this.endpointsSessionsLock = new ReentrantReadWriteLock();
this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -274,13 +279,39 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
}
+ @Override
+ public TelemetryCommand getSettingsCommand() {
+ final Settings settings = this.getClientSettings().toProtobuf();
+ return TelemetryCommand.newBuilder().setSettings(settings).build();
+ }
+
+ @Override
+ public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
+ StreamObserver<TelemetryCommand> observer) throws ClientException {
+ try {
+ final Metadata metadata = this.sign();
+ return clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), observer);
+ } catch (ClientException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new InternalErrorException(t);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> register() {
+ return Futures.transformAsync(this.getClientSettings().arrivedFuture,
+ (clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor);
+ }
+
/**
* This method is invoked while request of printing thread stack trace is received from remote.
*
* @param endpoints remote endpoints.
* @param command request of printing thread stack trace from remote.
*/
- void onPrintThreadStackCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) {
+ @Override
+ public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) {
final String nonce = command.getNonce();
Runnable task = () -> {
try {
@@ -314,6 +345,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param settings settings received from remote.
*/
+ @Override
public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
final Metric metric = new Metric(settings.getMetric());
clientMeterProvider.reset(metric);
@@ -322,10 +354,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
/**
- * @see Client#telemeterSettings()
+ * @see Client#syncSettings()
*/
@Override
- public void telemeterSettings() {
+ public void syncSettings() {
final Settings settings = getClientSettings().toProtobuf();
final TelemetryCommand command = TelemetryCommand.newBuilder().setSettings(settings).build();
final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
@@ -345,12 +377,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param command command to telemeter.
*/
public void telemeter(Endpoints endpoints, TelemetryCommand command) {
- final ListenableFuture<TelemetrySession> future = registerTelemetrySession(endpoints);
- Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+ final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints);
+ Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
@Override
- public void onSuccess(TelemetrySession session) {
+ public void onSuccess(ClientSessionImpl session) {
try {
- session.telemeter(command);
+ session.publish(command);
} catch (Throwable t) {
LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command);
}
@@ -364,43 +396,44 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
private void releaseTelemetrySessions() {
- telemetrySessionsLock.readLock().lock();
+ endpointsSessionsLock.readLock().lock();
try {
- telemetrySessionTable.values().forEach(TelemetrySession::release);
+ endpointsSessionTable.values().forEach(ClientSessionImpl::release);
} finally {
- telemetrySessionsLock.readLock().unlock();
+ endpointsSessionsLock.readLock().unlock();
}
}
/**
* Try to register telemetry session, return it directly if session is existed already.
*/
- public ListenableFuture<TelemetrySession> registerTelemetrySession(Endpoints endpoints) {
- final SettableFuture<TelemetrySession> future0 = SettableFuture.create();
- telemetrySessionsLock.readLock().lock();
+ public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) {
+ final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create();
+ endpointsSessionsLock.readLock().lock();
try {
- TelemetrySession telemetrySession = telemetrySessionTable.get(endpoints);
+ ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints);
// Return is directly if session is existed already.
- if (null != telemetrySession) {
- future0.set(telemetrySession);
+ if (null != clientSessionImpl) {
+ future0.set(clientSessionImpl);
return future0;
}
} finally {
- telemetrySessionsLock.readLock().unlock();
+ endpointsSessionsLock.readLock().unlock();
}
// Future's exception has been logged during the registration.
- final ListenableFuture<TelemetrySession> future = TelemetrySession.register(this, clientManager, endpoints);
+ final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register();
return Futures.transform(future, session -> {
- telemetrySessionsLock.writeLock().lock();
+ endpointsSessionsLock.writeLock().lock();
try {
- TelemetrySession existed = telemetrySessionTable.get(endpoints);
+ ClientSessionImpl existed = endpointsSessionTable.get(endpoints);
if (null != existed) {
+ session.release();
return existed;
}
- telemetrySessionTable.put(endpoints, session);
+ endpointsSessionTable.put(endpoints, session);
return session;
} finally {
- telemetrySessionsLock.writeLock().unlock();
+ endpointsSessionsLock.writeLock().unlock();
}
}, MoreExecutors.directExecutor());
}
@@ -412,7 +445,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
*/
public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic,
TopicRouteDataResult topicRouteDataResult) {
- final ListenableFuture<List<TelemetrySession>> future =
+ final ListenableFuture<List<ClientSessionImpl>> future =
Futures.allAsList(topicRouteDataResult.getTopicRouteData()
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
@@ -420,9 +453,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
.stream().map(this::registerTelemetrySession)
.collect(Collectors.toList()));
SettableFuture<Void> future0 = SettableFuture.create();
- Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() {
+ Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() {
@Override
- public void onSuccess(List<TelemetrySession> sessions) {
+ public void onSuccess(List<ClientSessionImpl> sessions) {
LOGGER.info("Register session successfully, current route will be cached, topic={}, "
+ "topicRouteDataResult={}", topic, topicRouteDataResult);
final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
@@ -459,6 +492,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param command request of message consume verification from remote.
*/
+ @Override
public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) {
LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}",
clientId, command);
@@ -482,6 +516,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param command request of orphaned transaction recovery from remote.
*/
+ @Override
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+ "command={}", clientId, command);
@@ -532,10 +567,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
/**
- * @see Client#getClientId()
+ * @see Client#clientId()
*/
@Override
- public String getClientId() {
+ public String clientId() {
return clientId;
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 337edcc..b564634 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -131,12 +131,12 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
@Override
public void registerClient(Client client) {
- clientTable.put(client.getClientId(), client);
+ clientTable.put(client.clientId(), client);
}
@Override
public void unregisterClient(Client client) {
- clientTable.remove(client.getClientId());
+ clientTable.remove(client.clientId());
}
@Override
@@ -189,9 +189,9 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
private void syncSettings() {
clientTable.values().forEach(client -> {
try {
- client.telemeterSettings();
+ client.syncSettings();
} catch (Throwable t) {
- LOGGER.error("Failed to announce settings, clientId={}", client.getClientId(), t);
+ LOGGER.error("Failed to announce settings, clientId={}", client.clientId(), t);
}
});
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
index 42dc30d..80299da 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
@@ -57,7 +57,7 @@ public class ClientManagerRegistry {
clientManager.startAsync().awaitRunning();
singletonClientManager = clientManager;
}
- clientIds.add(client.getClientId());
+ clientIds.add(client.clientId());
singletonClientManager.registerClient(client);
return singletonClientManager;
} finally {
@@ -77,7 +77,7 @@ public class ClientManagerRegistry {
ClientManagerImpl clientManager = null;
clientIdsLock.lock();
try {
- clientIds.remove(client.getClientId());
+ clientIds.remove(client.clientId());
singletonClientManager.unregisterClient(client);
if (clientIds.isEmpty()) {
clientManager = singletonClientManager;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
similarity index 54%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 36d74a5..d6a19c2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -26,14 +26,11 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
-import java.io.UnsupportedEncodingException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,51 +39,39 @@ import org.slf4j.LoggerFactory;
* Telemetry session is constructed before first communication between client and remote route endpoints.
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public class TelemetrySession implements StreamObserver<TelemetryCommand> {
- private static final Logger LOGGER = LoggerFactory.getLogger(TelemetrySession.class);
+public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class);
- private final ClientImpl client;
- private final ClientManager clientManager;
+ private final ClientSessionProcessor processor;
private final Endpoints endpoints;
- private volatile StreamObserver<TelemetryCommand> requestObserver;
+ private final ReadWriteLock observerLock;
+ private StreamObserver<TelemetryCommand> requestObserver = null;
- private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
- this.client = client;
- this.clientManager = clientManager;
+ protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) {
+ this.processor = processor;
this.endpoints = endpoints;
+ this.observerLock = new ReentrantReadWriteLock();
}
- public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
- Endpoints endpoints) {
- return new TelemetrySession(client, clientManager, endpoints).register();
- }
-
- private ListenableFuture<TelemetrySession> register() {
- ListenableFuture<TelemetrySession> future;
+ protected ListenableFuture<ClientSessionImpl> register() {
+ ListenableFuture<ClientSessionImpl> future;
try {
- this.init();
- final ClientSettings clientSettings = client.getClientSettings();
- final Settings settings = clientSettings.toProtobuf();
- final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
- this.telemeter(settingsCommand);
- future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
- MoreExecutors.directExecutor());
+ final TelemetryCommand command = processor.getSettingsCommand();
+ this.publish(command);
+ future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor());
} catch (Throwable t) {
- SettableFuture<TelemetrySession> future0 = SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
- Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+ final String clientId = processor.clientId();
+ Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
@Override
- public void onSuccess(TelemetrySession session) {
- LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints,
- client.getClientId());
+ public void onSuccess(ClientSessionImpl session) {
+ LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId);
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints,
- client.getClientId(), t);
+ LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t);
release();
}
}, MoreExecutors.directExecutor());
@@ -96,31 +81,19 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
/**
* Release telemetry session.
*/
- public synchronized void release() {
+ public void release() {
+ this.observerLock.writeLock().lock();
try {
if (null != requestObserver) {
- requestObserver.onCompleted();
+ try {
+ requestObserver.onCompleted();
+ } catch (Throwable ignore) {
+ // Ignore exception on purpose.
+ }
+ requestObserver = null;
}
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
- }
- }
-
- /**
- * Initialize telemetry session.
- */
- private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
- InvalidKeyException, ClientException {
- this.release();
- final Metadata metadata = client.sign();
- this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
- }
-
- private void reinit() {
- try {
- init();
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
+ } finally {
+ this.observerLock.writeLock().unlock();
}
}
@@ -129,13 +102,24 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
*
* @param command appointed command to telemeter
*/
- public void telemeter(TelemetryCommand command) {
+ public void publish(TelemetryCommand command) throws ClientException {
+ this.observerLock.readLock().lock();
try {
+ if (null != requestObserver) {
+ requestObserver.onNext(command);
+ return;
+ }
+ } finally {
+ this.observerLock.readLock().unlock();
+ }
+ this.observerLock.writeLock().lock();
+ try {
+ if (null == requestObserver) {
+ this.requestObserver = processor.telemetry(endpoints, this);
+ }
requestObserver.onNext(command);
- } catch (RuntimeException e) {
- // Cancel RPC.
- requestObserver.onError(e);
- throw e;
+ } finally {
+ this.observerLock.writeLock().unlock();
}
}
@@ -146,52 +130,52 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
case SETTINGS: {
final Settings settings = command.getSettings();
LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
- client.getClientId());
- client.onSettingsCommand(endpoints, settings);
+ processor.clientId());
+ processor.onSettingsCommand(endpoints, settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
command.getRecoverOrphanedTransactionCommand();
LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
- + "clientId={}", endpoints, client.getClientId());
- client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
+ + "clientId={}", endpoints, processor.clientId());
+ processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
- endpoints, client.getClientId());
- client.onVerifyMessageCommand(endpoints, verifyMessageCommand);
+ endpoints, processor.clientId());
+ processor.onVerifyMessageCommand(endpoints, verifyMessageCommand);
break;
}
case PRINT_THREAD_STACK_TRACE_COMMAND: {
final PrintThreadStackTraceCommand printThreadStackTraceCommand =
command.getPrintThreadStackTraceCommand();
LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
- endpoints, client.getClientId());
- client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand);
+ endpoints, processor.clientId());
+ processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
break;
}
default:
LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
- endpoints, command, client.getClientId());
+ endpoints, command, processor.clientId());
}
} catch (Throwable t) {
LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
- + "clientId={}", command, client.getClientId(), t);
+ + "clientId={}", command, processor.clientId(), t);
}
}
@Override
public void onError(Throwable throwable) {
LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
- client.getClientId(), endpoints, throwable);
- reinit();
+ processor.clientId(), endpoints, throwable);
+ this.release();
}
@Override
public void onCompleted() {
- reinit();
+ this.release();
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index c914dce..bdc6a6d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -128,7 +128,7 @@ class ProcessQueueImpl implements ProcessQueue {
return false;
}
LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", idleDuration,
- maxIdleDuration, mq, consumer.getClientId());
+ maxIdleDuration, mq, consumer.clientId());
return true;
}
@@ -157,12 +157,12 @@ class ProcessQueueImpl implements ProcessQueue {
final MessageId messageId = messageView.getMessageId();
if (consumer.getPushConsumerSettings().isFifo()) {
LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, " +
- "messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+ "messageId={}, clientId={}", mq, messageId, consumer.clientId());
forwardToDeadLetterQueue(messageView);
return;
}
LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq,
- messageId, consumer.getClientId());
+ messageId, consumer.clientId());
nackMessage(messageView);
});
}
@@ -194,7 +194,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
// Should never reach here.
LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
- consumer.getClientId(), t);
+ consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -202,12 +202,12 @@ class ProcessQueueImpl implements ProcessQueue {
public void receiveMessage() {
if (dropped) {
LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
return;
}
if (this.isCacheFull()) {
LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
receiveMessageLater();
return;
}
@@ -217,7 +217,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void receiveMessageImmediately() {
if (!consumer.isRunning()) {
LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
return;
}
try {
@@ -251,7 +251,7 @@ class ProcessQueueImpl implements ProcessQueue {
// Should never reach here.
LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
" mq={}, endpoints={}, clientId={}",
- mq, endpoints, consumer.getClientId(), t);
+ mq, endpoints, consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -264,14 +264,14 @@ class ProcessQueueImpl implements ProcessQueue {
MessageHookPointsStatus.ERROR);
LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
- " clientId={}", mq, endpoints, consumer.getClientId(), t);
+ " clientId={}", mq, endpoints, consumer.clientId(), t);
receiveMessageLater();
}
}, MoreExecutors.directExecutor());
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
- consumer.getClientId(), t);
+ consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -282,7 +282,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
" mq={}, clientId={}",
- cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId());
+ cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.clientId());
return true;
}
final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
@@ -290,7 +290,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
" actual={} bytes, mq={}, clientId={}",
- cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId());
+ cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.clientId());
return true;
}
return false;
@@ -384,7 +384,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
private void ackMessage(MessageViewImpl messageView) {
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -450,7 +450,7 @@ class ProcessQueueImpl implements ProcessQueue {
int attempt = messageView.getDeliveryAttempt();
final MessageId messageId = messageView.getMessageId();
final ConsumeService service = consumer.getConsumeService();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
@@ -482,7 +482,7 @@ class ProcessQueueImpl implements ProcessQueue {
final SettableFuture<Void> future0) {
final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future =
consumer.forwardMessageToDeadLetterQueue(messageView);
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -528,7 +528,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void forwardToDeadLetterQueueLater(final MessageViewImpl messageView, final int attempt,
final SettableFuture<Void> future0) {
final MessageId messageId = messageView.getMessageId();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," +
@@ -558,7 +558,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
final SettableFuture<Void> future0) {
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -606,7 +606,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void ackFifoMessageLater(final MessageViewImpl messageView, final int attempt,
final SettableFuture<Void> future0) {
final MessageId messageId = messageView.getMessageId();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}",
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
new file mode 100644
index 0000000..8bdc019
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.producer;
+
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public interface ClientSessionProcessor {
+ ListenableFuture<Void> register();
+
+ String clientId();
+
+ TelemetryCommand getSettingsCommand();
+
+ StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer)
+ throws ClientException;
+
+ void onSettingsCommand(Endpoints endpoints, Settings settings);
+
+ void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command);
+
+ void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command);
+
+ void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command);
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 9451028..3547a96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -73,7 +73,7 @@ public class ClientMeterProvider {
}
public synchronized void reset(Metric metric) {
- final String clientId = client.getClientId();
+ final String clientId = client.clientId();
try {
if (clientMeter.satisfy(metric)) {
LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index d9bd316..6cfb776 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -55,7 +55,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
InvocationStatus.FAILURE;
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
- .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId())
.put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
histogram.record(duration.toMillis(), attributes);
}
@@ -74,7 +74,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
consumerGroup = ((SimpleConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId());
+ LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId());
return;
}
final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -92,7 +92,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
final DoubleHistogram histogram = optionalHistogram.get();
final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, client.clientId()).build();
histogram.record(latency, attributes);
}
@@ -103,7 +103,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId());
+ LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId());
return;
}
final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -114,7 +114,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
final Duration durationAfterDecoding = optionalDurationAfterDecoding.get();
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, client.clientId()).build();
final Optional<DoubleHistogram> optionalHistogram =
clientMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
if (!optionalHistogram.isPresent()) {
@@ -129,7 +129,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
final ClientImpl client = clientMeterProvider.getClient();
if (!(client instanceof PushConsumer)) {
// Should never reach here.
- LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId());
+ LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.clientId());
return;
}
PushConsumer pushConsumer = (PushConsumer) client;
@@ -138,7 +138,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
InvocationStatus.FAILURE;
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
- .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId())
.put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
.build();
final Optional<DoubleHistogram> optionalHistogram =
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index a7b5357..8b88702 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -87,7 +87,7 @@ public class PushConsumerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(okQueryRouteResponseFuture());
when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
"TestScheduler"));
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 88445d1..e6ede4e 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -59,7 +59,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -105,7 +105,7 @@ public class SimpleConsumerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(future0);
when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("TestScheduler"));
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 619538e..f069b52 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -58,7 +58,7 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.InvocationContext;
@@ -112,7 +112,7 @@ public class ProducerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(future0);
when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
"TestScheduler"));
@@ -145,7 +145,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
- any(Duration.class), any(TelemetrySession.class));
+ any(Duration.class), any(ClientSessionImpl.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<InvocationContext<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
@@ -165,7 +165,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class));
+ any(ClientSessionImpl.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<InvocationContext<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
@@ -177,7 +177,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
- any(Duration.class), any(TelemetrySession.class));
+ any(Duration.class), any(ClientSessionImpl.class));
final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString());
shutdown(producerWithoutTopicBinding);
@@ -189,7 +189,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class));
+ any(ClientSessionImpl.class));
final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture();
when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
any(Duration.class))).thenReturn(future);