You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/04 05:46:03 UTC

[rocketmq-clients] branch master updated: Java: bugfix about forgetting to start consumer service during client startup

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8042fef  Java: bugfix about forgetting to start consumer service during client startup
8042fef is described below

commit 8042fefbea9e941e84c2c5ad30a0ca0db7c7bdd4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 4 13:45:44 2022 +0800

    Java: bugfix about forgetting to start consumer service during client startup
---
 .../org/apache/rocketmq/client/java/impl/ClientImpl.java   |  3 ++-
 .../rocketmq/client/java/impl/consumer/ConsumeService.java |  7 -------
 .../client/java/impl/consumer/FifoConsumeService.java      | 14 ++++++++++++++
 .../client/java/impl/consumer/PushConsumerImpl.java        |  3 +++
 .../client/java/impl/consumer/StandardConsumeService.java  | 14 ++++++++++++++
 .../apache/rocketmq/client/java/message/MessageImpl.java   |  2 +-
 .../client/java/message/PublishingMessageImpl.java         |  5 +++++
 .../apache/rocketmq/client/java/metrics/MessageMeter.java  | 13 +++++++------
 8 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 21aa92e..a4b144d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -429,7 +429,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
                 if (topicRouteDataResult.equals(old)) {
                     // Log if topic route result remains the same.
-                    LOGGER.info("Topic route result remains the same, topic={}, clientId={}", topic, clientId);
+                    LOGGER.info("Topic route result remains the same, topic={}, route={}, clientId={}", topic, old,
+                        clientId);
                 } else {
                     // Log if topic route result is updated.
                     LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index 793b425..c4a2ebc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -61,13 +61,6 @@ public abstract class ConsumeService extends Dispatcher {
         this.scheduler = scheduler;
     }
 
-    @Override
-    public void shutDown() throws InterruptedException {
-        LOGGER.info("Begin to shutdown the consume service, clientId={}", clientId);
-        super.shutDown();
-        LOGGER.info("Shutdown the consume service successfully, clientId={}", clientId);
-    }
-
     public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {
         return consume(messageView, Duration.ZERO);
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index c8beeb2..c8c783e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -46,6 +46,20 @@ class FifoConsumeService extends ConsumeService {
         super(clientId, processQueueTable, messageListener, consumptionExecutor, messageInterceptor, scheduler);
     }
 
+    @Override
+    public void startUp() {
+        LOGGER.info("Begin to start the FIFO consume service, clientId={}", clientId);
+        super.startUp();
+        LOGGER.info("Begin to shutdown the FIFO consume service, clientId={}", clientId);
+    }
+
+    @Override
+    public void shutDown() throws InterruptedException {
+        LOGGER.info("Begin to shutdown the FIFO consume service, clientId={}", clientId);
+        super.shutDown();
+        LOGGER.info("Shutdown the FIFO consume service successfully, clientId={}", clientId);
+    }
+
     @SuppressWarnings("UnstableApiUsage")
     public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {
         if (!iterator.hasNext()) {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 94d974f..3eb921e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -155,6 +155,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             messageMeter.setMessageCacheObserver(this);
             final ScheduledExecutorService scheduler = clientManager.getScheduler();
             this.consumeService = createConsumeService();
+            this.consumeService.startAsync().awaitRunning();
             // Scan assignments periodically.
             scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
                 try {
@@ -397,6 +398,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                             cacheAssignments.put(topic, latest);
                             return;
                         }
+                        LOGGER.info("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
+                            existed, clientId);
                         // Process queue may be dropped, need to be synchronized anyway.
                         syncProcessQueue(topic, latest, filterExpression);
                     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index 52d0c21..452ef8f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -46,6 +46,20 @@ public class StandardConsumeService extends ConsumeService {
         super(clientId, processQueueTable, messageListener, consumptionExecutor, messageInterceptor, scheduler);
     }
 
+    @Override
+    public void startUp() {
+        LOGGER.info("Begin to start the standard consume service, clientId={}", clientId);
+        super.startUp();
+        LOGGER.info("Begin to shutdown the standard consume service, clientId={}", clientId);
+    }
+
+    @Override
+    public void shutDown() throws InterruptedException {
+        LOGGER.info("Begin to shutdown the standard consume service, clientId={}", clientId);
+        super.shutDown();
+        LOGGER.info("Shutdown the standard consume service successfully, clientId={}", clientId);
+    }
+
     /**
      * dispatch message(s) once
      *
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 2293c23..093dbad 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -54,7 +54,7 @@ public class MessageImpl implements Message {
      * logging warnings already, so we avoid repeating args check here.
      */
     MessageImpl(String topic, byte[] body, @Nullable String tag, Collection<String> keys,
-        @Nullable String parentTraceContext, @Nullable String messageGroup, @Nullable Long deliveryTimestamp,
+        @Nullable String messageGroup, @Nullable String parentTraceContext, @Nullable Long deliveryTimestamp,
         Map<String, String> properties) {
         this.topic = topic;
         this.body = body;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 7c6ea9c..b419531 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -149,6 +149,11 @@ public class PublishingMessageImpl extends MessageImpl {
         this.getTag().ifPresent(systemPropertiesBuilder::setTag);
         // Trace context
         this.getTraceContext().ifPresent(systemPropertiesBuilder::setTraceContext);
+        // Delivery timestamp
+        this.getDeliveryTimestamp()
+            .ifPresent(millis -> systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
+        // Message group
+        this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
         final SystemProperties systemProperties = systemPropertiesBuilder.build();
         Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
         return apache.rocketmq.v2.Message.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 7025a15..c4e6bda 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -99,29 +99,30 @@ public class MessageMeter {
 
     @SuppressWarnings("deprecation")
     public synchronized void refresh(Metric metric) {
+        final String clientId = client.getClientId();
         try {
             if (!metric.isOn()) {
-                LOGGER.info("Metric is off, skip refresh");
+                LOGGER.info("Skip metric refresh because metric is off, clientId={}", clientId);
                 shutdown();
                 return;
             }
             final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
             if (!optionalEndpoints.isPresent()) {
                 LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
-                    client.getClientId());
+                    clientId);
                 return;
             }
             final Endpoints newMetricEndpoints = optionalEndpoints.get();
             if (newMetricEndpoints.equals(metricEndpoints)) {
                 LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
-                    client.getClientId(), newMetricEndpoints);
+                    clientId, newMetricEndpoints);
                 return;
             }
             final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
                 .build();
             final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
                 .sslContext(sslContext)
-                .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
+                .intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
             final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
             if (null != socketAddresses) {
                 IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
@@ -161,12 +162,12 @@ public class MessageMeter {
 
             final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
             meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
-            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),
+            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", clientId,
                 metricEndpoints, newMetricEndpoints);
             this.reset();
             metricEndpoints = newMetricEndpoints;
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while refreshing message meter, clientId={}", client.getClientId(), t);
+            LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
         }
     }