You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/04 05:46:03 UTC
[rocketmq-clients] branch master updated: Java: bugfix about forgetting to start consumer service during client startup
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8042fef Java: bugfix about forgetting to start consumer service during client startup
8042fef is described below
commit 8042fefbea9e941e84c2c5ad30a0ca0db7c7bdd4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 4 13:45:44 2022 +0800
Java: bugfix about forgetting to start consumer service during client startup
---
.../org/apache/rocketmq/client/java/impl/ClientImpl.java | 3 ++-
.../rocketmq/client/java/impl/consumer/ConsumeService.java | 7 -------
.../client/java/impl/consumer/FifoConsumeService.java | 14 ++++++++++++++
.../client/java/impl/consumer/PushConsumerImpl.java | 3 +++
.../client/java/impl/consumer/StandardConsumeService.java | 14 ++++++++++++++
.../apache/rocketmq/client/java/message/MessageImpl.java | 2 +-
.../client/java/message/PublishingMessageImpl.java | 5 +++++
.../apache/rocketmq/client/java/metrics/MessageMeter.java | 13 +++++++------
8 files changed, 46 insertions(+), 15 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 21aa92e..a4b144d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -429,7 +429,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
if (topicRouteDataResult.equals(old)) {
// Log if topic route result remains the same.
- LOGGER.info("Topic route result remains the same, topic={}, clientId={}", topic, clientId);
+ LOGGER.info("Topic route result remains the same, topic={}, route={}, clientId={}", topic, old,
+ clientId);
} else {
// Log if topic route result is updated.
LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index 793b425..c4a2ebc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -61,13 +61,6 @@ public abstract class ConsumeService extends Dispatcher {
this.scheduler = scheduler;
}
- @Override
- public void shutDown() throws InterruptedException {
- LOGGER.info("Begin to shutdown the consume service, clientId={}", clientId);
- super.shutDown();
- LOGGER.info("Shutdown the consume service successfully, clientId={}", clientId);
- }
-
public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {
return consume(messageView, Duration.ZERO);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index c8beeb2..c8c783e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -46,6 +46,20 @@ class FifoConsumeService extends ConsumeService {
super(clientId, processQueueTable, messageListener, consumptionExecutor, messageInterceptor, scheduler);
}
+ @Override
+ public void startUp() {
+ LOGGER.info("Begin to start the FIFO consume service, clientId={}", clientId);
+ super.startUp();
+ LOGGER.info("Begin to shutdown the FIFO consume service, clientId={}", clientId);
+ }
+
+ @Override
+ public void shutDown() throws InterruptedException {
+ LOGGER.info("Begin to shutdown the FIFO consume service, clientId={}", clientId);
+ super.shutDown();
+ LOGGER.info("Shutdown the FIFO consume service successfully, clientId={}", clientId);
+ }
+
@SuppressWarnings("UnstableApiUsage")
public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {
if (!iterator.hasNext()) {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 94d974f..3eb921e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -155,6 +155,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
messageMeter.setMessageCacheObserver(this);
final ScheduledExecutorService scheduler = clientManager.getScheduler();
this.consumeService = createConsumeService();
+ this.consumeService.startAsync().awaitRunning();
// Scan assignments periodically.
scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
try {
@@ -397,6 +398,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
cacheAssignments.put(topic, latest);
return;
}
+ LOGGER.info("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
+ existed, clientId);
// Process queue may be dropped, need to be synchronized anyway.
syncProcessQueue(topic, latest, filterExpression);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index 52d0c21..452ef8f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -46,6 +46,20 @@ public class StandardConsumeService extends ConsumeService {
super(clientId, processQueueTable, messageListener, consumptionExecutor, messageInterceptor, scheduler);
}
+ @Override
+ public void startUp() {
+ LOGGER.info("Begin to start the standard consume service, clientId={}", clientId);
+ super.startUp();
+ LOGGER.info("Begin to shutdown the standard consume service, clientId={}", clientId);
+ }
+
+ @Override
+ public void shutDown() throws InterruptedException {
+ LOGGER.info("Begin to shutdown the standard consume service, clientId={}", clientId);
+ super.shutDown();
+ LOGGER.info("Shutdown the standard consume service successfully, clientId={}", clientId);
+ }
+
/**
* dispatch message(s) once
*
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 2293c23..093dbad 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -54,7 +54,7 @@ public class MessageImpl implements Message {
* logging warnings already, so we avoid repeating args check here.
*/
MessageImpl(String topic, byte[] body, @Nullable String tag, Collection<String> keys,
- @Nullable String parentTraceContext, @Nullable String messageGroup, @Nullable Long deliveryTimestamp,
+ @Nullable String messageGroup, @Nullable String parentTraceContext, @Nullable Long deliveryTimestamp,
Map<String, String> properties) {
this.topic = topic;
this.body = body;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 7c6ea9c..b419531 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -149,6 +149,11 @@ public class PublishingMessageImpl extends MessageImpl {
this.getTag().ifPresent(systemPropertiesBuilder::setTag);
// Trace context
this.getTraceContext().ifPresent(systemPropertiesBuilder::setTraceContext);
+ // Delivery timestamp
+ this.getDeliveryTimestamp()
+ .ifPresent(millis -> systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
+ // Message group
+ this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
final SystemProperties systemProperties = systemPropertiesBuilder.build();
Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 7025a15..c4e6bda 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -99,29 +99,30 @@ public class MessageMeter {
@SuppressWarnings("deprecation")
public synchronized void refresh(Metric metric) {
+ final String clientId = client.getClientId();
try {
if (!metric.isOn()) {
- LOGGER.info("Metric is off, skip refresh");
+ LOGGER.info("Skip metric refresh because metric is off, clientId={}", clientId);
shutdown();
return;
}
final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
if (!optionalEndpoints.isPresent()) {
LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
- client.getClientId());
+ clientId);
return;
}
final Endpoints newMetricEndpoints = optionalEndpoints.get();
if (newMetricEndpoints.equals(metricEndpoints)) {
LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
- client.getClientId(), newMetricEndpoints);
+ clientId, newMetricEndpoints);
return;
}
final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
.sslContext(sslContext)
- .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
+ .intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
if (null != socketAddresses) {
IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
@@ -161,12 +162,12 @@ public class MessageMeter {
final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
- LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),
+ LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", clientId,
metricEndpoints, newMetricEndpoints);
this.reset();
metricEndpoints = newMetricEndpoints;
} catch (Throwable t) {
- LOGGER.error("Exception raised while refreshing message meter, clientId={}", client.getClientId(), t);
+ LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
}
}