You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/07 06:03:00 UTC
[incubator-eventmesh] branch master updated: Solve the problem that the client resubscribe after the grpc service is restarted.
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 94545dcb Solve the problem that the client resubscribe after the grpc service is restarted.
new 7ffb4870 Merge pull request #1454 from mytang0/grpc_client_resubscribe
94545dcb is described below
commit 94545dcbc96256e65ca95d7475d11bd8d3a11a24
Author: 鹿鸣 <me...@163.com>
AuthorDate: Thu Sep 29 13:57:32 2022 +0800
Solve the problem that the client resubscribe after the grpc service is restarted.
---
.../common/protocol/grpc/common/StatusCode.java | 3 +-
.../protocol/grpc/consumer/ConsumerManager.java | 22 ++++----
.../grpc/processor/HeartbeatProcessor.java | 9 +++-
.../grpc/consumer/EventMeshGrpcConsumer.java | 63 ++++++++++++++++++++--
4 files changed, 78 insertions(+), 19 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
index 15831d8c..e7cf9a37 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
@@ -41,7 +41,8 @@ public enum StatusCode {
EVENTMESH_HEARTBEAT_ERR("19", "eventMesh heartbeat err"),
EVENTMESH_ACL_ERR("20", "eventMesh acl err"),
EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR("21", "eventMesh send message speed over the limit err."),
- EVENTMESH_REQUEST_REPLY_MSG_ERR("22", "eventMesh request reply msg err, ");
+ EVENTMESH_REQUEST_REPLY_MSG_ERR("22", "eventMesh request reply msg err, "),
+ CLIENT_RESUBSCRIBE("30", "client needs to resubscribe.");
private String retCode;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
index 1f881153..cc32f7d1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
@@ -126,21 +126,21 @@ public class ConsumerManager {
}
}
- public void updateClientTime(ConsumerGroupClient client) {
+ public boolean updateClientTime(ConsumerGroupClient client) {
String consumerGroup = client.getConsumerGroup();
List<ConsumerGroupClient> localClients = clientTable.get(consumerGroup);
- if (CollectionUtils.isEmpty(localClients)) {
- return;
- }
- for (ConsumerGroupClient localClient : localClients) {
- if (StringUtils.equals(localClient.getIp(), client.getIp())
- && StringUtils.equals(localClient.getPid(), client.getPid())
- && StringUtils.equals(localClient.getSys(), client.getSys())
- && StringUtils.equals(localClient.getTopic(), client.getTopic())) {
- localClient.setLastUpTime(new Date());
- break;
+ if (CollectionUtils.isNotEmpty(localClients)) {
+ for (ConsumerGroupClient localClient : localClients) {
+ if (StringUtils.equals(localClient.getIp(), client.getIp())
+ && StringUtils.equals(localClient.getPid(), client.getPid())
+ && StringUtils.equals(localClient.getSys(), client.getSys())
+ && StringUtils.equals(localClient.getTopic(), client.getTopic())) {
+ localClient.setLastUpTime(new Date());
+ return true;
+ }
}
}
+ return false;
}
public synchronized void deregisterClient(ConsumerGroupClient client) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
index 4680d29a..d295dc94 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
@@ -90,7 +90,12 @@ public class HeartbeatProcessor {
.topic(item.getTopic())
.lastUpTime(new Date())
.build();
- consumerManager.updateClientTime(hbClient);
+
+ // consumer group client is lost, and the client needs to resubscribe.
+ if (!consumerManager.updateClientTime(hbClient)) {
+ ServiceUtils.sendRespAndDone(StatusCode.CLIENT_RESUBSCRIBE, emitter);
+ return;
+ }
}
ServiceUtils.sendRespAndDone(StatusCode.SUCCESS, "heartbeat success", emitter);
@@ -103,7 +108,7 @@ public class HeartbeatProcessor {
String user = header.getUsername();
String pass = header.getPassword();
String sys = header.getSys();
- int requestCode = Integer.valueOf(RequestCode.HEARTBEAT.getRequestCode());
+ int requestCode = RequestCode.HEARTBEAT.getRequestCode();
for (Heartbeat.HeartbeatItem item : heartbeat.getHeartbeatItemsList()) {
Acl.doAclCheckInHttpHeartbeat(remoteAdd, user, pass, sys, item.getTopic(), requestCode);
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
index f181f655..54e3738e 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
@@ -17,12 +17,16 @@
package org.apache.eventmesh.client.grpc.consumer;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc.ConsumerServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc.ConsumerServiceStub;
@@ -40,6 +44,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +62,8 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
private final EventMeshGrpcClientConfig clientConfig;
- private final Map<String, String> subscriptionMap = new ConcurrentHashMap<>();
+ private final Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("GRPCClientScheduler").setDaemon(true).build());
@@ -124,7 +130,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
private void addSubscription(List<SubscriptionItem> subscriptionItems, String url) {
for (SubscriptionItem item : subscriptionItems) {
- subscriptionMap.put(item.getTopic(), url);
+ subscriptionMap.put(item.getTopic(), new SubscriptionInfo(item, url));
}
}
@@ -164,7 +170,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
// there is no stream subscriptions, stop the subscription stream handler
synchronized (this) {
- if (!subscriptionMap.containsValue(SDK_STREAM_URL) && subStreamHandler != null) {
+ if (subscriptionMap.isEmpty() && subStreamHandler != null) {
subStreamHandler.close();
subStreamHandler = null;
}
@@ -231,10 +237,11 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
.setConsumerGroup(clientConfig.getConsumerGroup())
.setClientType(Heartbeat.ClientType.SUB);
- for (Map.Entry<String, String> entry : subscriptionMap.entrySet()) {
+ for (Map.Entry<String, SubscriptionInfo> entry : subscriptionMap.entrySet()) {
Heartbeat.HeartbeatItem heartbeatItem = Heartbeat.HeartbeatItem
.newBuilder()
- .setTopic(entry.getKey()).setUrl(entry.getValue())
+ .setTopic(entry.getKey())
+ .setUrl(entry.getValue().getUrl())
.build();
heartbeatBuilder.addHeartbeatItems(heartbeatItem);
}
@@ -245,6 +252,10 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
if (logger.isDebugEnabled()) {
logger.debug("Grpc Consumer Heartbeat response: {}", response);
}
+
+ if (StatusCode.CLIENT_RESUBSCRIBE.getRetCode().equals(response.getRespCode())) {
+ resubscribe();
+ }
} catch (Exception e) {
logger.error("Error in sending out heartbeat. error {}", e.getMessage());
}
@@ -253,6 +264,22 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
logger.info("Grpc Consumer Heartbeat started.");
}
+ private void resubscribe() {
+ if (subscriptionMap.isEmpty()) {
+ return;
+ }
+
+ Map<String, List<SubscriptionItem>> subscriptionGroup =
+ subscriptionMap.values().stream()
+ .collect(Collectors.groupingBy(SubscriptionInfo::getUrl,
+ mapping(SubscriptionInfo::getSubscriptionItem, toList())));
+
+ subscriptionGroup.forEach((url, items) -> {
+ Subscription subscription = buildSubscription(items, url);
+ subStreamHandler.sendSubscription(subscription);
+ });
+ }
+
@Override
public void close() {
if (this.subStreamHandler != null) {
@@ -261,4 +288,30 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
channel.shutdown();
scheduler.shutdown();
}
+
+ private static class SubscriptionInfo {
+ private SubscriptionItem subscriptionItem;
+ private String url;
+
+ SubscriptionInfo(SubscriptionItem subscriptionItem, String url) {
+ this.subscriptionItem = subscriptionItem;
+ this.url = url;
+ }
+
+ public SubscriptionItem getSubscriptionItem() {
+ return subscriptionItem;
+ }
+
+ public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+ this.subscriptionItem = subscriptionItem;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org