You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/07 06:03:00 UTC

[incubator-eventmesh] branch master updated: Solve the problem that the client resubscribe after the grpc service is restarted.

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 94545dcb Solve the problem that the client resubscribe after the grpc service is restarted.
     new 7ffb4870 Merge pull request #1454 from mytang0/grpc_client_resubscribe
94545dcb is described below

commit 94545dcbc96256e65ca95d7475d11bd8d3a11a24
Author: 鹿鸣 <me...@163.com>
AuthorDate: Thu Sep 29 13:57:32 2022 +0800

    Solve the problem that the client resubscribe after the grpc service is restarted.
---
 .../common/protocol/grpc/common/StatusCode.java    |  3 +-
 .../protocol/grpc/consumer/ConsumerManager.java    | 22 ++++----
 .../grpc/processor/HeartbeatProcessor.java         |  9 +++-
 .../grpc/consumer/EventMeshGrpcConsumer.java       | 63 ++++++++++++++++++++--
 4 files changed, 78 insertions(+), 19 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
index 15831d8c..e7cf9a37 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/StatusCode.java
@@ -41,7 +41,8 @@ public enum StatusCode {
     EVENTMESH_HEARTBEAT_ERR("19", "eventMesh heartbeat err"),
     EVENTMESH_ACL_ERR("20", "eventMesh acl err"),
     EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR("21", "eventMesh send message speed over the limit err."),
-    EVENTMESH_REQUEST_REPLY_MSG_ERR("22", "eventMesh request reply msg err, ");
+    EVENTMESH_REQUEST_REPLY_MSG_ERR("22", "eventMesh request reply msg err, "),
+    CLIENT_RESUBSCRIBE("30", "client needs to resubscribe.");
 
     private String retCode;
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
index 1f881153..cc32f7d1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java
@@ -126,21 +126,21 @@ public class ConsumerManager {
         }
     }
 
-    public void updateClientTime(ConsumerGroupClient client) {
+    public boolean updateClientTime(ConsumerGroupClient client) {
         String consumerGroup = client.getConsumerGroup();
         List<ConsumerGroupClient> localClients = clientTable.get(consumerGroup);
-        if (CollectionUtils.isEmpty(localClients)) {
-            return;
-        }
-        for (ConsumerGroupClient localClient : localClients) {
-            if (StringUtils.equals(localClient.getIp(), client.getIp())
-                && StringUtils.equals(localClient.getPid(), client.getPid())
-                && StringUtils.equals(localClient.getSys(), client.getSys())
-                && StringUtils.equals(localClient.getTopic(), client.getTopic())) {
-                localClient.setLastUpTime(new Date());
-                break;
+        if (CollectionUtils.isNotEmpty(localClients)) {
+            for (ConsumerGroupClient localClient : localClients) {
+                if (StringUtils.equals(localClient.getIp(), client.getIp())
+                    && StringUtils.equals(localClient.getPid(), client.getPid())
+                    && StringUtils.equals(localClient.getSys(), client.getSys())
+                    && StringUtils.equals(localClient.getTopic(), client.getTopic())) {
+                    localClient.setLastUpTime(new Date());
+                    return true;
+                }
             }
         }
+        return false;
     }
 
     public synchronized void deregisterClient(ConsumerGroupClient client) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
index 4680d29a..d295dc94 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java
@@ -90,7 +90,12 @@ public class HeartbeatProcessor {
                 .topic(item.getTopic())
                 .lastUpTime(new Date())
                 .build();
-            consumerManager.updateClientTime(hbClient);
+
+            // consumer group client is lost, and the client needs to resubscribe.
+            if (!consumerManager.updateClientTime(hbClient)) {
+                ServiceUtils.sendRespAndDone(StatusCode.CLIENT_RESUBSCRIBE, emitter);
+                return;
+            }
         }
 
         ServiceUtils.sendRespAndDone(StatusCode.SUCCESS, "heartbeat success", emitter);
@@ -103,7 +108,7 @@ public class HeartbeatProcessor {
             String user = header.getUsername();
             String pass = header.getPassword();
             String sys = header.getSys();
-            int requestCode = Integer.valueOf(RequestCode.HEARTBEAT.getRequestCode());
+            int requestCode = RequestCode.HEARTBEAT.getRequestCode();
             for (Heartbeat.HeartbeatItem item : heartbeat.getHeartbeatItemsList()) {
                 Acl.doAclCheckInHttpHeartbeat(remoteAdd, user, pass, sys, item.getTopic(), requestCode);
             }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
index f181f655..54e3738e 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
@@ -17,12 +17,16 @@
 
 package org.apache.eventmesh.client.grpc.consumer;
 
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
 import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
 import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
 import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
 import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc;
 import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc.ConsumerServiceBlockingStub;
 import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc.ConsumerServiceStub;
@@ -40,6 +44,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +62,8 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
 
     private final EventMeshGrpcClientConfig clientConfig;
 
-    private final Map<String, String> subscriptionMap = new ConcurrentHashMap<>();
+    private final Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();
+
     private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
         Runtime.getRuntime().availableProcessors(),
         new ThreadFactoryBuilder().setNameFormat("GRPCClientScheduler").setDaemon(true).build());
@@ -124,7 +130,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
 
     private void addSubscription(List<SubscriptionItem> subscriptionItems, String url) {
         for (SubscriptionItem item : subscriptionItems) {
-            subscriptionMap.put(item.getTopic(), url);
+            subscriptionMap.put(item.getTopic(), new SubscriptionInfo(item, url));
         }
     }
 
@@ -164,7 +170,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
 
             // there is no stream subscriptions, stop the subscription stream handler
             synchronized (this) {
-                if (!subscriptionMap.containsValue(SDK_STREAM_URL) && subStreamHandler != null) {
+                if (subscriptionMap.isEmpty() && subStreamHandler != null) {
                     subStreamHandler.close();
                     subStreamHandler = null;
                 }
@@ -231,10 +237,11 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
                 .setConsumerGroup(clientConfig.getConsumerGroup())
                 .setClientType(Heartbeat.ClientType.SUB);
 
-            for (Map.Entry<String, String> entry : subscriptionMap.entrySet()) {
+            for (Map.Entry<String, SubscriptionInfo> entry : subscriptionMap.entrySet()) {
                 Heartbeat.HeartbeatItem heartbeatItem = Heartbeat.HeartbeatItem
                     .newBuilder()
-                    .setTopic(entry.getKey()).setUrl(entry.getValue())
+                    .setTopic(entry.getKey())
+                    .setUrl(entry.getValue().getUrl())
                     .build();
                 heartbeatBuilder.addHeartbeatItems(heartbeatItem);
             }
@@ -245,6 +252,10 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Grpc Consumer Heartbeat response: {}", response);
                 }
+
+                if (StatusCode.CLIENT_RESUBSCRIBE.getRetCode().equals(response.getRespCode())) {
+                    resubscribe();
+                }
             } catch (Exception e) {
                 logger.error("Error in sending out heartbeat. error {}", e.getMessage());
             }
@@ -253,6 +264,22 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
         logger.info("Grpc Consumer Heartbeat started.");
     }
 
+    private void resubscribe() {
+        if (subscriptionMap.isEmpty()) {
+            return;
+        }
+
+        Map<String, List<SubscriptionItem>> subscriptionGroup =
+            subscriptionMap.values().stream()
+                .collect(Collectors.groupingBy(SubscriptionInfo::getUrl,
+                    mapping(SubscriptionInfo::getSubscriptionItem, toList())));
+
+        subscriptionGroup.forEach((url, items) -> {
+            Subscription subscription = buildSubscription(items, url);
+            subStreamHandler.sendSubscription(subscription);
+        });
+    }
+
     @Override
     public void close() {
         if (this.subStreamHandler != null) {
@@ -261,4 +288,30 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
         channel.shutdown();
         scheduler.shutdown();
     }
+
+    private static class SubscriptionInfo {
+        private SubscriptionItem subscriptionItem;
+        private String url;
+
+        SubscriptionInfo(SubscriptionItem subscriptionItem, String url) {
+            this.subscriptionItem = subscriptionItem;
+            this.url = url;
+        }
+
+        public SubscriptionItem getSubscriptionItem() {
+            return subscriptionItem;
+        }
+
+        public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+            this.subscriptionItem = subscriptionItem;
+        }
+
+        public String getUrl() {
+            return url;
+        }
+
+        public void setUrl(String url) {
+            this.url = url;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org