You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/28 09:09:07 UTC
[rocketmq-clients] 01/01: Polish code
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit af6500d98f0568c4f6976d724dc83be57d367b6d
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jul 28 14:01:22 2022 +0800
Polish code
---
.../rocketmq/client/java/impl/ClientImpl.java | 346 ++++++++++-----------
.../client/java/impl/ClientSessionImpl.java | 137 ++++----
.../java/impl/consumer/ProcessQueueImpl.java | 12 +-
.../java/impl/consumer/PushConsumerImpl.java | 15 +-
.../java/impl/consumer/SimpleConsumerImpl.java | 13 +-
.../impl/consumer/SubscriptionLoadBalancer.java | 37 +--
...ionProcessor.java => ClientSessionHandler.java} | 14 +-
.../client/java/impl/producer/ProducerImpl.java | 29 +-
.../java/impl/producer/PublishingLoadBalancer.java | 42 +--
.../client/java/route/TopicRouteDataResult.java | 125 --------
10 files changed, 289 insertions(+), 481 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index cc93fe9..bc2f68f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
+import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.QueryRouteRequest;
@@ -69,12 +70,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
-import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
import org.apache.rocketmq.client.java.metrics.Metric;
@@ -82,18 +87,16 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcInvocation;
import org.apache.rocketmq.client.java.rpc.Signature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor,
+public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler,
MessageInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class);
- private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3);
-
private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(102 * 365);
protected final ClientManager clientManager;
@@ -111,15 +114,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
protected final String clientId;
private volatile ScheduledFuture<?> updateRouteCacheFuture;
- private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache;
+ private final ConcurrentMap<String, TopicRouteData> topicRouteCache;
@GuardedBy("inflightRouteFutureLock")
- private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable;
+ private final Map<String /* topic */, Set<SettableFuture<TopicRouteData>>> inflightRouteFutureTable;
private final Lock inflightRouteFutureLock;
- @GuardedBy("endpointsSessionsLock")
- private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable;
- private final ReadWriteLock endpointsSessionsLock;
+ @GuardedBy("sessionsLock")
+ private final Map<Endpoints, ClientSessionImpl> sessionsTable;
+ private final ReadWriteLock sessionsLock;
@GuardedBy("messageInterceptorsLock")
private final List<MessageInterceptor> messageInterceptors;
@@ -132,13 +135,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
// Generate client id firstly.
this.clientId = Utilities.genClientId();
- this.topicRouteResultCache = new ConcurrentHashMap<>();
+ this.topicRouteCache = new ConcurrentHashMap<>();
this.inflightRouteFutureTable = new ConcurrentHashMap<>();
this.inflightRouteFutureLock = new ReentrantLock();
- this.endpointsSessionTable = new HashMap<>();
- this.endpointsSessionsLock = new ReentrantReadWriteLock();
+ this.sessionsTable = new HashMap<>();
+ this.sessionsLock = new ReentrantReadWriteLock();
this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -180,21 +183,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
// Fetch topic route from remote.
LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
clientId, topics);
- // Aggregate all topic route data futures into a composited future.
- final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream()
- .map(this::getRouteDataResult)
- .collect(Collectors.toList());
- List<TopicRouteDataResult> results;
- try {
- results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(),
- TimeUnit.NANOSECONDS);
- } catch (Throwable t) {
- LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
- + "topics={}", clientId, topics, t);
- throw new NotFoundException(t);
- }
- for (TopicRouteDataResult result : results) {
- result.checkAndGetTopicRouteData();
+ final List<ListenableFuture<TopicRouteData>> futures =
+ topics.stream().map(this::fetchTopicRoute).collect(Collectors.toList());
+ for (ListenableFuture<TopicRouteData> future : futures) {
+ future.get();
}
LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
clientId, topics);
@@ -282,7 +274,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
@Override
- public TelemetryCommand getSettingsCommand() {
+ public TelemetryCommand settingsCommand() {
final Settings settings = this.getClientSettings().toProtobuf();
return TelemetryCommand.newBuilder().setSettings(settings).build();
}
@@ -301,7 +293,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
@Override
- public ListenableFuture<Void> register() {
+ public boolean isEndpointsDeprecated(Endpoints endpoints) {
+ final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
+ return totalRouteEndpoints.contains(endpoints);
+ }
+
+ @Override
+ public ListenableFuture<Void> awaitSettingSynchronized() {
return Futures.transformAsync(this.getClientSettings().arrivedFuture,
(clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor);
}
@@ -325,7 +323,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
.setThreadStackTrace(threadStackTrace)
.setStatus(status)
.build();
- telemeter(endpoints, telemetryCommand);
+ telemetry(endpoints, telemetryCommand);
} catch (Throwable t) {
LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}",
endpoints, nonce, clientId, t);
@@ -351,7 +349,6 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
final Metric metric = new Metric(settings.getMetric());
clientMeterProvider.reset(metric);
- LOGGER.info("Receive settings from remote, endpoints={}", endpoints);
this.getClientSettings().applySettingsCommand(settings);
}
@@ -365,127 +362,81 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
for (Endpoints endpoints : totalRouteEndpoints) {
try {
- telemeter(endpoints, command);
+ telemetry(endpoints, command);
} catch (Throwable t) {
LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t);
}
}
}
- /**
- * Telemeter command to remote endpoints.
- *
- * @param endpoints remote endpoints to telemeter.
- * @param command command to telemeter.
- */
- public void telemeter(Endpoints endpoints, TelemetryCommand command) {
- final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints);
- Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
- @Override
- public void onSuccess(ClientSessionImpl session) {
- try {
- session.publish(command);
- } catch (Throwable t) {
- LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOGGER.error("Failed to telemeter command to remote, endpoints={}, command={}", endpoints, command, t);
- }
- }, MoreExecutors.directExecutor());
+ public void telemetry(Endpoints endpoints, TelemetryCommand command) {
+ final ClientSessionImpl clientSession = getClientSession(endpoints);
+ try {
+ clientSession.fireWrite(command);
+ } catch (Throwable t) {
+ LOGGER.error("Failed to fire write telemetry command, clientId={}, endpoints={}", clientId, endpoints, t);
+ }
}
private void releaseClientSessions() {
- endpointsSessionsLock.readLock().lock();
+ sessionsLock.readLock().lock();
try {
- endpointsSessionTable.values().forEach(ClientSessionImpl::release);
+ sessionsTable.values().forEach(ClientSessionImpl::release);
} finally {
- endpointsSessionsLock.readLock().unlock();
+ sessionsLock.readLock().unlock();
}
}
- /**
- * Try to register telemetry session, return it directly if session is existed already.
- */
- public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) {
- final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create();
- endpointsSessionsLock.readLock().lock();
+ public ClientSessionImpl getClientSession(Endpoints endpoints) {
+ sessionsLock.readLock().lock();
try {
- ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints);
- // Return is directly if session is existed already.
- if (null != clientSessionImpl) {
- future0.set(clientSessionImpl);
- return future0;
+ final ClientSessionImpl session = sessionsTable.get(endpoints);
+ if (null != session) {
+ return session;
}
} finally {
- endpointsSessionsLock.readLock().unlock();
+ sessionsLock.readLock().unlock();
}
- // Future's exception has been logged during the registration.
- final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register();
- return Futures.transform(future, session -> {
- endpointsSessionsLock.writeLock().lock();
- try {
- ClientSessionImpl existed = endpointsSessionTable.get(endpoints);
- if (null != existed) {
- session.release();
- return existed;
- }
- endpointsSessionTable.put(endpoints, session);
+ sessionsLock.writeLock().lock();
+ try {
+ ClientSessionImpl session = sessionsTable.get(endpoints);
+ if (null != session) {
return session;
- } finally {
- endpointsSessionsLock.writeLock().unlock();
}
- }, MoreExecutors.directExecutor());
+ session = new ClientSessionImpl(this, endpoints);
+ sessionsTable.put(endpoints, session);
+ return session;
+ } finally {
+ sessionsLock.writeLock().unlock();
+ }
+ }
+
+ public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
+ final ClientSessionImpl clientSession = getClientSession(endpoints);
+ return clientSession.syncSettingsSafely();
}
/**
- * Triggered when {@link TopicRouteDataResult} is fetched from remote.
+ * Triggered when {@link TopicRouteData} is fetched from remote.
*
* <p>Never thrown any exception.
*/
- public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic,
- TopicRouteDataResult topicRouteDataResult) {
- final ListenableFuture<List<ClientSessionImpl>> future =
- Futures.allAsList(topicRouteDataResult.getTopicRouteData()
- .getMessageQueues().stream()
- .map(mq -> mq.getBroker().getEndpoints())
- .collect(Collectors.toSet())
- .stream().map(this::registerTelemetrySession)
- .collect(Collectors.toList()));
- SettableFuture<Void> future0 = SettableFuture.create();
- Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() {
- @Override
- public void onSuccess(List<ClientSessionImpl> sessions) {
- LOGGER.info("Register session successfully, current route will be cached, topic={}, "
- + "topicRouteDataResult={}", topic, topicRouteDataResult);
- final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
- if (topicRouteDataResult.equals(old)) {
- // Log if topic route result remains the same.
- LOGGER.info("Topic route result remains the same, topic={}, route={}, clientId={}", topic, old,
- clientId);
- } else {
- // Log if topic route result is updated.
- LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
- old, topicRouteDataResult);
- }
- future0.setFuture(Futures.immediateVoidFuture());
- onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Note: Topic route would not be updated if failed to register session.
- LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, "
- + "topicRouteDataResult={}", topic, topicRouteDataResult);
- future0.setException(t);
- }
- }, MoreExecutors.directExecutor());
- return future0;
+ public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic, TopicRouteData topicRouteData) {
+ final List<ListenableFuture<Void>> futures = topicRouteData
+ .getMessageQueues().stream()
+ .map(mq -> mq.getBroker().getEndpoints())
+ .collect(Collectors.toSet())
+ .stream().map(this::syncSettingsSafely)
+ .collect(Collectors.toList());
+ // TODO: Record exception.
+ return Futures.whenAllSucceed(futures).callAsync(() -> {
+ topicRouteCache.put(topic, topicRouteData);
+ onTopicRouteDataUpdate0(topic, topicRouteData);
+ return Futures.immediateFuture(topicRouteData);
+ }, clientCallbackExecutor);
}
- public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+ public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
}
/**
@@ -506,7 +457,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
.setStatus(status)
.build();
try {
- telemeter(endpoints, telemetryCommand);
+ telemetry(endpoints, telemetryCommand);
} catch (Throwable t) {
LOGGER.warn("Failed to send message verification result, clientId={}", clientId, t);
}
@@ -520,20 +471,17 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
*/
@Override
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
- LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+ LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, clientId={}, "
+ "command={}", clientId, command);
}
private void updateRouteCache() {
LOGGER.info("Start to update route cache for a new round, clientId={}", clientId);
- topicRouteResultCache.keySet().forEach(topic -> {
- // Set timeout for future on purpose.
- final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic),
- TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
- Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
+ topicRouteCache.keySet().forEach(topic -> {
+ final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
+ Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
- public void onSuccess(TopicRouteDataResult topicRouteDataResult) {
- onTopicRouteDataResultFetched(topic, topicRouteDataResult);
+ public void onSuccess(TopicRouteData topicRouteData) {
}
@Override
@@ -648,7 +596,25 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
public void doStats() {
}
- private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final String topic) {
+ private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
+ final ListenableFuture<TopicRouteData> future = Futures.transformAsync(fetchTopicRoute0(topic),
+ topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor());
+ Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
+ @Override
+ public void onSuccess(TopicRouteData topicRouteData) {
+ LOGGER.info("Fetch topic route successfully, clientId={}, topic={}, topicRouteData={}", clientId,
+ topic, topicRouteData);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOGGER.error("Failed to fetch topic route, clientId={}, topic={}", clientId, topic, t);
+ }
+ }, MoreExecutors.directExecutor());
+ return future;
+ }
+
+ private ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
try {
Resource topicResource = Resource.newBuilder().setName(topic).build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
@@ -656,17 +622,37 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
final Metadata metadata = sign();
final ListenableFuture<RpcInvocation<QueryRouteResponse>> future =
clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
- return Futures.transform(future, invocation -> {
+ return Futures.transformAsync(future, invocation -> {
final QueryRouteResponse response = invocation.getResponse();
final String requestId = invocation.getContext().getRequestId();
final Status status = response.getStatus();
+ final String statusMessage = status.getMessage();
final Code code = status.getCode();
- if (Code.OK != code) {
- LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " +
- "clientId={}, requestId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
- requestId, endpoints, code, status.getMessage());
+ final int codeNumber = code.getNumber();
+ switch (code) {
+ case OK:
+ break;
+ case BAD_REQUEST:
+ case ILLEGAL_ACCESS_POINT:
+ case ILLEGAL_TOPIC:
+ case CLIENT_ID_REQUIRED:
+ throw new BadRequestException(codeNumber, requestId, statusMessage);
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ throw new NotFoundException(codeNumber, requestId, statusMessage);
+ case TOO_MANY_REQUESTS:
+ throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ throw new InternalErrorException(codeNumber, requestId, statusMessage);
+ case PROXY_TIMEOUT:
+ throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
+ default:
+ throw new UnsupportedException(codeNumber, requestId, statusMessage);
}
- return new TopicRouteDataResult(invocation);
+ final List<MessageQueue> messageQueuesList = response.getMessageQueuesList();
+ final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
+ return Futures.immediateFuture(topicRouteData);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
return Futures.immediateFailedFuture(t);
@@ -675,29 +661,29 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
protected Set<Endpoints> getTotalRouteEndpoints() {
Set<Endpoints> totalRouteEndpoints = new HashSet<>();
- for (TopicRouteDataResult result : topicRouteResultCache.values()) {
- totalRouteEndpoints.addAll(result.getTopicRouteData().getTotalEndpoints());
+ for (TopicRouteData topicRouteData : topicRouteCache.values()) {
+ totalRouteEndpoints.addAll(topicRouteData.getTotalEndpoints());
}
return totalRouteEndpoints;
}
- protected ListenableFuture<TopicRouteDataResult> getRouteDataResult(final String topic) {
- SettableFuture<TopicRouteDataResult> future0 = SettableFuture.create();
- TopicRouteDataResult topicRouteDataResult = topicRouteResultCache.get(topic);
+ protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
+ SettableFuture<TopicRouteData> future0 = SettableFuture.create();
+ TopicRouteData topicRouteData = topicRouteCache.get(topic);
// If route result was cached before, get it directly.
- if (null != topicRouteDataResult) {
- future0.set(topicRouteDataResult);
+ if (null != topicRouteData) {
+ future0.set(topicRouteData);
return future0;
}
inflightRouteFutureLock.lock();
try {
// If route was fetched by last in-flight request, get it directly.
- topicRouteDataResult = topicRouteResultCache.get(topic);
- if (null != topicRouteDataResult) {
- future0.set(topicRouteDataResult);
+ topicRouteData = topicRouteCache.get(topic);
+ if (null != topicRouteData) {
+ future0.set(topicRouteData);
return future0;
}
- Set<SettableFuture<TopicRouteDataResult>> inflightFutures = inflightRouteFutureTable.get(topic);
+ Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic);
// Request is in-flight, return future directly.
if (null != inflightFutures) {
inflightFutures.add(future0);
@@ -709,52 +695,48 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
} finally {
inflightRouteFutureLock.unlock();
}
- final ListenableFuture<TopicRouteDataResult> future = fetchTopicRoute(topic);
- Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
+ final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
+ Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
- public void onSuccess(TopicRouteDataResult result) {
- final ListenableFuture<Void> updateFuture = onTopicRouteDataResultFetched(topic, result);
- // TODO: all succeed?
- Futures.whenAllSucceed(updateFuture).run(() -> {
- inflightRouteFutureLock.lock();
- try {
- final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
- inflightRouteFutureTable.remove(topic);
- if (null == newFutureSet) {
- // Should never reach here.
- LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
- clientId);
- return;
- }
- LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
- + "size={}, clientId={}", topic, newFutureSet.size(), clientId);
- for (SettableFuture<TopicRouteDataResult> newFuture : newFutureSet) {
- newFuture.set(result);
- }
- } catch (Throwable t) {
+ public void onSuccess(TopicRouteData topicRouteData) {
+ inflightRouteFutureLock.lock();
+ try {
+ final Set<SettableFuture<TopicRouteData>> newFutureSet =
+ inflightRouteFutureTable.remove(topic);
+ if (null == newFutureSet) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
- clientId, t);
- } finally {
- inflightRouteFutureLock.unlock();
+ LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
+ clientId);
+ return;
}
- }, MoreExecutors.directExecutor());
+ LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
+ + "size={}, clientId={}", topic, newFutureSet.size(), clientId);
+ for (SettableFuture<TopicRouteData> newFuture : newFutureSet) {
+ newFuture.set(topicRouteData);
+ }
+ } catch (Throwable t) {
+ // Should never reach here.
+ LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
+ clientId, t);
+ } finally {
+ inflightRouteFutureLock.unlock();
+ }
}
@Override
public void onFailure(Throwable t) {
inflightRouteFutureLock.lock();
try {
- final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
+ final Set<SettableFuture<TopicRouteData>> newFutureSet =
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
return;
}
- LOGGER.error("Failed to fetch topic route, topic={}, in-flight route future " +
+ LOGGER.debug("Failed to fetch topic route, topic={}, in-flight route future " +
"size={}, clientId={}", topic, newFutureSet.size(), clientId, t);
- for (SettableFuture<TopicRouteDataResult> future : newFutureSet) {
+ for (SettableFuture<TopicRouteData> future : newFutureSet) {
future.setException(t);
}
} finally {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index d6a19c2..90402eb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,15 +22,11 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.stub.StreamObserver;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,144 +34,125 @@ import org.slf4j.LoggerFactory;
/**
* Telemetry session is constructed before first communication between client and remote route endpoints.
*/
-@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class);
- private final ClientSessionProcessor processor;
+ private final ClientSessionHandler handler;
private final Endpoints endpoints;
- private final ReadWriteLock observerLock;
- private StreamObserver<TelemetryCommand> requestObserver = null;
+ private volatile StreamObserver<TelemetryCommand> requestObserver;
- protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) {
- this.processor = processor;
+ protected ClientSessionImpl(ClientSessionHandler handler, Endpoints endpoints) {
+ this.handler = handler;
this.endpoints = endpoints;
- this.observerLock = new ReentrantReadWriteLock();
+ renewRequestObserver();
}
- protected ListenableFuture<ClientSessionImpl> register() {
- ListenableFuture<ClientSessionImpl> future;
+ private void renewRequestObserver() {
try {
- final TelemetryCommand command = processor.getSettingsCommand();
- this.publish(command);
- future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor());
+ if (handler.isEndpointsDeprecated(endpoints)) {
+ LOGGER.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}", endpoints);
+ return;
+ }
+ this.requestObserver = handler.telemetry(endpoints, this);
} catch (Throwable t) {
- future = Futures.immediateFailedFuture(t);
+ handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
}
- final String clientId = processor.clientId();
- Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
- @Override
- public void onSuccess(ClientSessionImpl session) {
- LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId);
- }
+ }
- @Override
- public void onFailure(Throwable t) {
- LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t);
- release();
- }
- }, MoreExecutors.directExecutor());
- return future;
+ protected ListenableFuture<Void> syncSettingsSafely() {
+ try {
+ final TelemetryCommand settings = handler.settingsCommand();
+ fireWrite(settings);
+ return handler.awaitSettingSynchronized();
+ } catch (Throwable t) {
+ return Futures.immediateFailedFuture(t);
+ }
}
/**
* Release telemetry session.
*/
public void release() {
- this.observerLock.writeLock().lock();
+ if (null == requestObserver) {
+ return;
+ }
try {
- if (null != requestObserver) {
- try {
- requestObserver.onCompleted();
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
- }
- requestObserver = null;
- }
- } finally {
- this.observerLock.writeLock().unlock();
+ requestObserver.onCompleted();
+ } catch (Throwable ignore) {
+ // Ignore exception on purpose.
}
}
- /**
- * Telemeter command to remote.
- *
- * @param command appointed command to telemeter
- */
- public void publish(TelemetryCommand command) throws ClientException {
- this.observerLock.readLock().lock();
- try {
- if (null != requestObserver) {
- requestObserver.onNext(command);
- return;
- }
- } finally {
- this.observerLock.readLock().unlock();
- }
- this.observerLock.writeLock().lock();
- try {
- if (null == requestObserver) {
- this.requestObserver = processor.telemetry(endpoints, this);
- }
- requestObserver.onNext(command);
- } finally {
- this.observerLock.writeLock().unlock();
+ public void fireWrite(TelemetryCommand command) {
+ if (null == requestObserver) {
+ LOGGER.error("Request observer does not exist, ignore current command, endpoints={}, command={}",
+ endpoints, command);
+ return;
}
+ requestObserver.onNext(command);
}
@Override
public void onNext(TelemetryCommand command) {
+ final String clientId = handler.clientId();
try {
switch (command.getCommandCase()) {
case SETTINGS: {
final Settings settings = command.getSettings();
- LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
- processor.clientId());
- processor.onSettingsCommand(endpoints, settings);
+ LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, clientId);
+ handler.onSettingsCommand(endpoints, settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
command.getRecoverOrphanedTransactionCommand();
LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
- + "clientId={}", endpoints, processor.clientId());
- processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
+ + "clientId={}", endpoints, clientId);
+ handler.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
- endpoints, processor.clientId());
- processor.onVerifyMessageCommand(endpoints, verifyMessageCommand);
+ endpoints, clientId);
+ handler.onVerifyMessageCommand(endpoints, verifyMessageCommand);
break;
}
case PRINT_THREAD_STACK_TRACE_COMMAND: {
final PrintThreadStackTraceCommand printThreadStackTraceCommand =
command.getPrintThreadStackTraceCommand();
LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
- endpoints, processor.clientId());
- processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
+ endpoints, clientId);
+ handler.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
break;
}
default:
LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
- endpoints, command, processor.clientId());
+ endpoints, command, clientId);
}
} catch (Throwable t) {
LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
- + "clientId={}", command, processor.clientId(), t);
+ + "clientId={}", command, clientId, t);
}
}
@Override
public void onError(Throwable throwable) {
LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
- processor.clientId(), endpoints, throwable);
- this.release();
+ handler.clientId(), endpoints, throwable);
+ release();
+ if (!handler.isRunning()) {
+ return;
+ }
+ handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
}
@Override
public void onCompleted() {
- this.release();
+ release();
+ if (!handler.isRunning()) {
+ return;
+ }
+ handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index d4271bf..965507c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -497,18 +497,18 @@ class ProcessQueueImpl implements ProcessQueue {
forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
return;
}
+ // Set result if message is forwarded successfully.
+ future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+ "attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
attempt, messageId, mq, endpoints, requestId);
- } else {
- LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
- + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
- endpoints, requestId);
+ return;
}
- // Set result if message is forwarded successfully.
- future0.setFuture(Futures.immediateVoidFuture());
+ LOGGER.info("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+ + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
+ endpoints, requestId);
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 2c18a92..86bef80 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -77,7 +77,7 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -234,9 +234,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
- final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
- TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
- topicRouteDataResult.checkAndGetTopicRouteData();
+ final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+ handleClientFuture(future);
subscriptionExpressions.put(topic, filterExpression);
return this;
}
@@ -257,9 +256,9 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
}
private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) {
- final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
- return Futures.transformAsync(future, topicRouteDataResult -> {
- Endpoints endpoints = topicRouteDataResult.checkAndGetTopicRouteData().pickEndpointsToQueryAssignments();
+ final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+ return Futures.transformAsync(future, topicRouteData -> {
+ Endpoints endpoints = topicRouteData.pickEndpointsToQueryAssignments();
return Futures.immediateFuture(endpoints);
}, MoreExecutors.directExecutor());
}
@@ -513,7 +512,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
.setStatus(status)
.build();
try {
- telemeter(endpoints, command);
+ telemetry(endpoints, command);
} catch (Throwable t) {
LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, "
+ "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d90edf8..d8ce5d8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -55,7 +55,7 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,9 +130,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running now");
}
- final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
- TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
- topicRouteDataResult.checkAndGetTopicRouteData();
+ final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+ handleClientFuture(future);
subscriptionExpressions.put(topic, filterExpression);
return this;
}
@@ -359,9 +358,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
return simpleConsumerSettings;
}
- public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+ public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
final SubscriptionLoadBalancer subscriptionLoadBalancer =
- new SubscriptionLoadBalancer(topicRouteDataResult);
+ new SubscriptionLoadBalancer(topicRouteData);
subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
}
@@ -372,7 +371,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
future0.set(result);
return future0;
}
- final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
+ final ListenableFuture<TopicRouteData> future = getRouteData(topic);
return Futures.transform(future, topicRouteDataResult -> {
final SubscriptionLoadBalancer subscriptionLoadBalancer =
new SubscriptionLoadBalancer(topicRouteDataResult);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 21610b2..012d441 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -17,20 +17,20 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.math.IntMath;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.Immutable;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
@Immutable
public class SubscriptionLoadBalancer {
- private final TopicRouteDataResult topicRouteDataResult;
/**
* Index for round-robin.
*/
@@ -40,30 +40,19 @@ public class SubscriptionLoadBalancer {
*/
private final ImmutableList<MessageQueueImpl> messageQueues;
- public SubscriptionLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
- this.topicRouteDataResult = topicRouteDataResult;
+ public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
- final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
- if (!topicRouteDataResult.ok()) {
- this.messageQueues = builder.build();
- return;
+ final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
+ .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
+ Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
+ .collect(Collectors.toList());
+ if (mqs.isEmpty()) {
+ throw new IllegalArgumentException("No readable message queue found, topiRouteData=" + topicRouteData);
}
- for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) {
- if (!messageQueue.getPermission().isReadable() ||
- Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) {
- continue;
- }
- builder.add(messageQueue);
- }
- this.messageQueues = builder.build();
+ this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}
- public MessageQueueImpl takeMessageQueue() throws ClientException {
- topicRouteDataResult.checkAndGetTopicRouteData();
- if (messageQueues.isEmpty()) {
- // Should never reach here.
- throw new NotFoundException("Failed to take message queue due to readable message queue doesn't exist");
- }
+ public MessageQueueImpl takeMessageQueue() {
final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
similarity index 83%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 8bdc019..817e24f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -24,15 +24,23 @@ import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.route.Endpoints;
-public interface ClientSessionProcessor {
- ListenableFuture<Void> register();
+public interface ClientSessionHandler {
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ boolean isRunning();
+
+ ScheduledExecutorService getScheduler();
+
+ boolean isEndpointsDeprecated(Endpoints endpoints);
+
+ ListenableFuture<Void> awaitSettingSynchronized();
String clientId();
- TelemetryCommand getSettingsCommand();
+ TelemetryCommand settingsCommand();
StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer)
throws ClientException;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index bb4ae49..08483f0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ class ProducerImpl extends ClientImpl implements Producer {
protected final ProducerSettings producerSettings;
private final TransactionChecker checker;
- private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataResultCache;
+ private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataCache;
/**
* The caller is supposed to have validated the arguments and handled throwing exception or
@@ -102,7 +102,7 @@ class ProducerImpl extends ClientImpl implements Producer {
this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy,
clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
- this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
+ this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
@Override
@@ -540,26 +540,21 @@ class ProducerImpl extends ClientImpl implements Producer {
}
@Override
- public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+ public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
final PublishingLoadBalancer publishingLoadBalancer =
- new PublishingLoadBalancer(topicRouteDataResult);
- publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
+ new PublishingLoadBalancer(topicRouteData);
+ publishingRouteDataCache.put(topic, publishingLoadBalancer);
}
private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
- SettableFuture<PublishingLoadBalancer> future0 = SettableFuture.create();
- final PublishingLoadBalancer result = publishingRouteDataResultCache.get(topic);
+ final PublishingLoadBalancer result = publishingRouteDataCache.get(topic);
if (null != result) {
- future0.set(result);
- return future0;
+ return Futures.immediateFuture(result);
}
- return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
- SettableFuture<PublishingLoadBalancer> future = SettableFuture.create();
- final PublishingLoadBalancer publishingLoadBalancer =
- new PublishingLoadBalancer(topicRouteDataResult);
- publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
- future.set(publishingLoadBalancer);
- return future;
+ return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> {
+ final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult);
+ publishingRouteDataCache.put(topic, loadBalancer);
+ return Futures.immediateFuture(loadBalancer);
}, MoreExecutors.directExecutor());
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index 9b326de..feb9616 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.client.java.impl.producer;
import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.math.IntMath;
@@ -28,19 +29,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.Immutable;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Broker;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
@Immutable
public class PublishingLoadBalancer {
- private final TopicRouteDataResult topicRouteDataResult;
/**
* Index for round-robin.
*/
@@ -50,40 +49,25 @@ public class PublishingLoadBalancer {
*/
private final ImmutableList<MessageQueueImpl> messageQueues;
- public PublishingLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
- this.topicRouteDataResult = topicRouteDataResult;
+ public PublishingLoadBalancer(TopicRouteData topicRouteData) {
this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
- final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
- if (!topicRouteDataResult.ok()) {
- this.messageQueues = builder.build();
- return;
- }
- for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) {
- if (!messageQueue.getPermission().isWritable() ||
- Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) {
- continue;
- }
- builder.add(messageQueue);
- }
- this.messageQueues = builder.build();
- }
-
- private void preconditionCheckBeforeTakingMessageQueue() throws ClientException {
- topicRouteDataResult.checkAndGetTopicRouteData();
- if (messageQueues.isEmpty()) {
- throw new NotFoundException("Failed to take message due to writable message queue doesn't exist");
+ final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
+ .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
+ Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
+ .collect(Collectors.toList());
+ if (mqs.isEmpty()) {
+ throw new IllegalArgumentException("No writable message queue found, topiRouteData=" + topicRouteData);
}
+ this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}
- public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) throws ClientException {
- preconditionCheckBeforeTakingMessageQueue();
+ public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) {
final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
final int index = LongMath.mod(hashCode, messageQueues.size());
return messageQueues.get(index);
}
- public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) throws ClientException {
- preconditionCheckBeforeTakingMessageQueue();
+ public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) {
int next = index.getAndIncrement();
List<MessageQueueImpl> candidates = new ArrayList<>();
Set<String> candidateBrokerNames = new HashSet<>();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
deleted file mode 100644
index 94adee4..0000000
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.route;
-
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.MessageQueue;
-import apache.rocketmq.v2.QueryRouteResponse;
-import apache.rocketmq.v2.Status;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import java.util.List;
-import javax.annotation.concurrent.Immutable;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.BadRequestException;
-import org.apache.rocketmq.client.java.exception.InternalErrorException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
-import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
-import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.exception.UnsupportedException;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
-
-/**
- * Result topic route data fetched from remote.
- */
-@Immutable
-public class TopicRouteDataResult {
- private final TopicRouteData topicRouteData;
- private final ClientException exception;
-
- public TopicRouteDataResult(RpcInvocation<QueryRouteResponse> invocation) {
- final QueryRouteResponse response = invocation.getResponse();
- final String requestId = invocation.getContext().getRequestId();
- final List<MessageQueue> messageQueuesList = response.getMessageQueuesList();
- final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
- final Status status = response.getStatus();
- this.topicRouteData = topicRouteData;
- final Code code = status.getCode();
- final int codeNumber = code.getNumber();
- final String statusMessage = status.getMessage();
- switch (code) {
- case OK:
- this.exception = null;
- break;
- case BAD_REQUEST:
- case ILLEGAL_ACCESS_POINT:
- case ILLEGAL_TOPIC:
- case CLIENT_ID_REQUIRED:
- this.exception = new BadRequestException(codeNumber, requestId, statusMessage);
- break;
- case NOT_FOUND:
- case TOPIC_NOT_FOUND:
- this.exception = new NotFoundException(codeNumber, requestId, statusMessage);
- break;
- case TOO_MANY_REQUESTS:
- this.exception = new TooManyRequestsException(codeNumber, requestId, statusMessage);
- break;
- case INTERNAL_ERROR:
- case INTERNAL_SERVER_ERROR:
- this.exception = new InternalErrorException(codeNumber, requestId, statusMessage);
- break;
- case PROXY_TIMEOUT:
- this.exception = new ProxyTimeoutException(codeNumber, requestId, statusMessage);
- break;
- default:
- this.exception = new UnsupportedException(codeNumber, requestId, statusMessage);
- }
- }
-
- public TopicRouteData getTopicRouteData() {
- return topicRouteData;
- }
-
- public TopicRouteData checkAndGetTopicRouteData() throws ClientException {
- if (null != exception) {
- throw exception;
- }
- return topicRouteData;
- }
-
- public boolean ok() {
- return null == exception;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TopicRouteDataResult result = (TopicRouteDataResult) o;
- return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(exception, result.exception);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(topicRouteData, exception);
- }
-
- @Override
- public String toString() {
- final MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this)
- .add("topicRouteData", this.topicRouteData);
- if (null == exception) {
- return helper.toString();
- }
- return helper.add("exception", this.exception).toString();
- }
-}