You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/11/21 04:56:47 UTC

[rocketmq-clients] branch master updated: Delay setting wait time

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new fbb1d0cf Delay setting wait time
fbb1d0cf is described below

commit fbb1d0cf311e8486b0ce40141699eb49451fcea4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Nov 21 10:16:12 2022 +0800

    Delay setting wait time
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  5 ++-
 .../client/java/impl/ClientSessionImpl.java        | 14 +++++---
 .../client/java/impl/ClientSessionImplTest.java    | 40 ++++++++++++++--------
 3 files changed, 36 insertions(+), 23 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index aabdd9bb..91d7ba3b 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -62,7 +62,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -376,7 +375,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             if (null != session) {
                 return session;
             }
-            session = new ClientSessionImpl(this, endpoints);
+            session = new ClientSessionImpl(this, clientConfiguration.getRequestTimeout(), endpoints);
             sessionsTable.put(endpoints, session);
             return session;
         } finally {
@@ -388,7 +387,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * Triggered when {@link TopicRouteData} is fetched from remote.
      */
     public void onTopicRouteDataFetched(String topic, TopicRouteData topicRouteData) throws ClientException,
-        ExecutionException, InterruptedException, TimeoutException {
+        ExecutionException, InterruptedException {
         final Set<Endpoints> routeEndpoints = topicRouteData
             .getMessageQueues().stream()
             .map(mq -> mq.getBroker().getEndpoints())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index ba9a5805..2d691512 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,12 +22,12 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.misc.ClientId;
@@ -41,17 +41,21 @@ import org.slf4j.LoggerFactory;
 public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
     static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = Duration.ofSeconds(1);
     private static final Logger log = LoggerFactory.getLogger(ClientSessionImpl.class);
-    private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
+    private static final Duration SETTINGS_INITIALIZATION_TIMEOUT = Duration.ofSeconds(3);
 
     private final ClientSessionHandler sessionHandler;
     private final Endpoints endpoints;
     private final SettableFuture<Settings> settingsSettableFuture;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
-    protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints endpoints) throws ClientException {
+    @SuppressWarnings("UnstableApiUsage")
+    protected ClientSessionImpl(ClientSessionHandler sessionHandler, Duration tolerance, Endpoints endpoints)
+        throws ClientException {
         this.sessionHandler = sessionHandler;
         this.endpoints = endpoints;
         this.settingsSettableFuture = SettableFuture.create();
+        Futures.withTimeout(settingsSettableFuture, SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+            TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
         this.requestObserver = sessionHandler.telemetry(endpoints, this);
     }
 
@@ -77,9 +81,9 @@ public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
         syncSettings0();
     }
 
-    protected void syncSettings() throws TimeoutException, ExecutionException, InterruptedException {
+    protected void syncSettings() throws ExecutionException, InterruptedException {
         this.syncSettings0();
-        settingsSettableFuture.get(SETTINGS_INITIALIZATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        settingsSettableFuture.get();
     }
 
     private void syncSettings0() {
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index e693513b..2c6b9805 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -29,11 +29,12 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import io.grpc.stub.StreamObserver;
+import java.time.Duration;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -46,13 +47,14 @@ public class ClientSessionImplTest extends TestBase {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void syncSettings() throws ClientException, ExecutionException, InterruptedException, TimeoutException {
+    public void syncSettings() throws ClientException, ExecutionException, InterruptedException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doNothing().when(requestObserver).onNext(any(TelemetryCommand.class));
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class), any(Settings.class));
@@ -70,10 +72,11 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnNextWithRecoverOrphanedTransactionCommand() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
             any(RecoverOrphanedTransactionCommand.class));
@@ -89,10 +92,11 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnNextWithVerifyMessageCommand() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
             any(VerifyMessageCommand.class));
@@ -108,10 +112,11 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnNextWithPrintThreadStackTraceCommand() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
             any(PrintThreadStackTraceCommand.class));
@@ -127,10 +132,11 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnNextWithUnrecognizedCommand() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class), any(Settings.class));
         Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
@@ -155,11 +161,12 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnError() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
         Mockito.doNothing().when(requestObserver).onCompleted();
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doReturn(true).when(sessionHandler).isRunning();
         Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
@@ -167,7 +174,7 @@ public class ClientSessionImplTest extends TestBase {
         clientSession.onError(e);
         Mockito.verify(sessionHandler, times(1)).isRunning();
         Mockito.verify(requestObserver, times(1)).onCompleted();
-        Mockito.verify(sessionHandler, times(1)).getScheduler();
+        Mockito.verify(sessionHandler, times(2)).getScheduler();
     }
 
     @Test
@@ -175,18 +182,19 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnErrorWithSessionHandlerIsNotRunning() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
         Mockito.doNothing().when(requestObserver).onCompleted();
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doReturn(false).when(sessionHandler).isRunning();
         final Exception e = new Exception();
         clientSession.onError(e);
         Mockito.verify(sessionHandler, times(1)).isRunning();
         Mockito.verify(requestObserver, times(1)).onCompleted();
-        Mockito.verify(sessionHandler, never()).getScheduler();
+        Mockito.verify(sessionHandler, times(1)).getScheduler();
     }
 
     @Test
@@ -194,11 +202,12 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnCompleted() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
         Mockito.doNothing().when(requestObserver).onCompleted();
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doReturn(true).when(sessionHandler).isRunning();
         Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
@@ -206,7 +215,7 @@ public class ClientSessionImplTest extends TestBase {
         clientSession.onCompleted();
         Mockito.verify(sessionHandler, times(1)).isRunning();
         Mockito.verify(requestObserver, times(1)).onCompleted();
-        Mockito.verify(sessionHandler, times(1)).getScheduler();
+        Mockito.verify(sessionHandler, times(2)).getScheduler();
         await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
             .untilAsserted(() -> {
                 Mockito.verify(sessionHandler, times(1)).isEndpointsDeprecated(eq(endpoints));
@@ -219,16 +228,17 @@ public class ClientSessionImplTest extends TestBase {
     public void testOnCompletedWithSessionHandlerIsNotRunning() throws ClientException {
         final Endpoints endpoints = fakeEndpoints();
         final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
         final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
         Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
             any(StreamObserver.class));
         Mockito.doNothing().when(requestObserver).onCompleted();
-        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+        final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
         Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
         Mockito.doReturn(false).when(sessionHandler).isRunning();
         clientSession.onCompleted();
         Mockito.verify(sessionHandler, times(1)).isRunning();
         Mockito.verify(requestObserver, times(1)).onCompleted();
-        Mockito.verify(sessionHandler, never()).getScheduler();
+        Mockito.verify(sessionHandler, times(1)).getScheduler();
     }
 }
\ No newline at end of file