You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/12 08:05:45 UTC
[rocketmq-clients] branch master updated: Add client type into HeartbeatRequest
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 129f63e6 Add client type into HeartbeatRequest
129f63e6 is described below
commit 129f63e6ddcf0bb20d7b4e9ceca1f6dbcbe39dd8
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Dec 12 16:00:42 2022 +0800
Add client type into HeartbeatRequest
---
.../apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java | 6 ------
.../rocketmq/client/java/impl/consumer/PushConsumerImpl.java | 4 +++-
.../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java | 8 ++++++++
.../apache/rocketmq/client/java/impl/producer/ProducerImpl.java | 3 ++-
4 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 465fdb23..c4ee9751 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -24,7 +24,6 @@ import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.FilterType;
-import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -218,11 +217,6 @@ abstract class ConsumerImpl extends ClientImpl {
return future;
}
- @Override
- public HeartbeatRequest wrapHeartbeatRequest() {
- return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).build();
- }
-
protected Resource getProtobufGroup() {
return Resource.newBuilder().setName(consumerGroup).build();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 6a07f6e0..277ad437 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
@@ -317,7 +318,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
@Override
public HeartbeatRequest wrapHeartbeatRequest() {
- return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).build();
+ return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+ .setClientType(ClientType.PUSH_CONSUMER).build();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index a21267d0..4b7ddb13 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -21,6 +21,8 @@ import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import com.google.common.math.IntMath;
@@ -152,6 +154,12 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
return new HashMap<>(subscriptionExpressions);
}
+ @Override
+ public HeartbeatRequest wrapHeartbeatRequest() {
+ return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+ .setClientType(ClientType.SIMPLE_CONSUMER).build();
+ }
+
/**
* @see SimpleConsumer#receive(int, Duration)
*/
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 820a09ae..1db6e179 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.java.impl.producer;
import static com.google.common.base.Preconditions.checkNotNull;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.EndTransactionResponse;
@@ -190,7 +191,7 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public HeartbeatRequest wrapHeartbeatRequest() {
- return HeartbeatRequest.newBuilder().build();
+ return HeartbeatRequest.newBuilder().setClientType(ClientType.PRODUCER).build();
}
/**