You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/12 08:05:45 UTC

[rocketmq-clients] branch master updated: Add client type into HeartbeatRequest

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 129f63e6 Add client type into HeartbeatRequest
129f63e6 is described below

commit 129f63e6ddcf0bb20d7b4e9ceca1f6dbcbe39dd8
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Dec 12 16:00:42 2022 +0800

    Add client type into HeartbeatRequest
---
 .../apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java   | 6 ------
 .../rocketmq/client/java/impl/consumer/PushConsumerImpl.java      | 4 +++-
 .../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java    | 8 ++++++++
 .../apache/rocketmq/client/java/impl/producer/ProducerImpl.java   | 3 ++-
 4 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 465fdb23..c4ee9751 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -24,7 +24,6 @@ import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.FilterType;
-import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.Message;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -218,11 +217,6 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    @Override
-    public HeartbeatRequest wrapHeartbeatRequest() {
-        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).build();
-    }
-
     protected Resource getProtobufGroup() {
         return Resource.newBuilder().setName(consumerGroup).build();
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 6a07f6e0..277ad437 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
@@ -317,7 +318,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
 
     @Override
     public HeartbeatRequest wrapHeartbeatRequest() {
-        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).build();
+        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+            .setClientType(ClientType.PUSH_CONSUMER).build();
     }
 
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index a21267d0..4b7ddb13 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -21,6 +21,8 @@ import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
 import com.google.common.math.IntMath;
@@ -152,6 +154,12 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         return new HashMap<>(subscriptionExpressions);
     }
 
+    @Override
+    public HeartbeatRequest wrapHeartbeatRequest() {
+        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+            .setClientType(ClientType.SIMPLE_CONSUMER).build();
+    }
+
     /**
      * @see SimpleConsumer#receive(int, Duration)
      */
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 820a09ae..1db6e179 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.java.impl.producer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.EndTransactionRequest;
 import apache.rocketmq.v2.EndTransactionResponse;
@@ -190,7 +191,7 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     public HeartbeatRequest wrapHeartbeatRequest() {
-        return HeartbeatRequest.newBuilder().build();
+        return HeartbeatRequest.newBuilder().setClientType(ClientType.PRODUCER).build();
     }
 
     /**