You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/11/23 02:00:09 UTC

[GitHub] [rocketmq] drpmma opened a new pull request, #5575: [ISSUE #5392] Support remoting protocol in rocketmq proxy module

drpmma opened a new pull request, #5575:
URL: https://github.com/apache/rocketmq/pull/5575

   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   Support remoting protocol in the rocketmq proxy module, including the rocketmq-client which version >= 4.9.5 for 4.9.x branch and version >= 5.0.1 for develop branch.
   
   ## Brief changelog
   
   See [wiki](https://github.com/apache/rocketmq/wiki/RIP-55-Support-remoting-protocol-in-rocketmq-proxy-module) for more information.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1038745930


##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java:
##########
@@ -205,6 +206,32 @@ public CompletableFuture<Long> getMinOffset(ProxyContext ctx, AddressableMessage
         );
     }
 
+    @Override
+    public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        try {
+            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis);
+        } catch (Exception e) {
+            CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        try {
+            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis);
+        } catch (Exception e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1038744863


##########
common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java:
##########
@@ -40,6 +42,22 @@ public String getValue() {
         return value;
     }
 
+    public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) {
+        String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+        String isTransValue = "true";
+        if (isTransValue.equals(isTrans)) {
+            return TopicMessageType.TRANSACTION;
+        } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
+            || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
+            || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
+            return TopicMessageType.DELAY;
+        } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) {
+            return TopicMessageType.FIFO;
+        } else {
+            return TopicMessageType.NORMAL;
+        }
+    }

Review Comment:
   `else` here is redundant seemingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1038744913


##########
proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.channel;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+
+public class ChannelHelper {
+
+    /**
+     * judge channel is sync from other proxy or not
+     *
+     * @param channel channel
+     * @return true if is sync from other proxy
+     */
+    public static boolean isRemote(Channel channel) {
+        return channel instanceof RemoteChannel;
+    }
+
+    public static ChannelProtocolType getChannelProtocolType(Channel channel) {
+        if (channel instanceof GrpcClientChannel) {
+            return ChannelProtocolType.GRPC_V2;
+        } else if (channel instanceof RemotingChannel) {
+            return ChannelProtocolType.REMOTING;
+        } else if (channel instanceof RemoteChannel) {
+            RemoteChannel remoteChannel = (RemoteChannel) channel;
+            return remoteChannel.getType();
+        }

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
lizhimins commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1039332179


##########
common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java:
##########
@@ -40,6 +42,22 @@ public String getValue() {
         return value;
     }
 
+    public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) {
+        String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+        String isTransValue = "true";

Review Comment:
   use Boolean maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] echooymxq commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
echooymxq commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1043089248


##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+
+public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
+
+    protected ThreadPoolExecutor threadPoolExecutor;
+    protected ConsumerManager consumerManager;
+    protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+
+    public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
+        ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+        super(topicRouteService, adminService, mqClientAPIFactory);
+        this.consumerManager = consumerManager;
+        this.init();
+    }
+
+    protected void init() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor(
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            1,
+            TimeUnit.MINUTES,
+            "HeartbeatSyncer",
+            proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity()
+        );
+        this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+            @Override
+            public void handle(ConsumerGroupEvent event, String s, Object... args) {
+                if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+                    if (args == null || args.length < 1) {
+                        return;
+                    }
+                    if (args[0] instanceof ClientChannelInfo) {
+                        ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                        remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
+                    }
+                }
+            }
+
+            @Override
+            public void shutdown() {
+
+            }
+        });
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        this.threadPoolExecutor.shutdown();
+        super.shutdown();
+    }
+
+    public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.REGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        consumeType,
+                        messageModel,
+                        consumeFromWhere,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode()
+                    );
+                    data.setSubscriptionDataSet(subList);
+
+                    log.debug("sync register heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                        consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+        }
+    }
+
+    public void onConsumerUnRegister(String consumerGroup, ClientChannelInfo clientChannelInfo) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.UNREGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        null,
+                        null,
+                        null,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode()
+                    );
+
+                    log.debug("sync unregister heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                        consumerGroup, clientChannelInfo, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                consumerGroup, clientChannelInfo, t);
+        }
+    }
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+        if (msgs == null || msgs.isEmpty()) {
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+
+        for (MessageExt msg : msgs) {
+            try {
+                HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
+                if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+                    continue;
+                }

Review Comment:
   If there are two proxy instances on one machine, will there be a problem? The same address cannot distinguish different heartbeat data, maybe we need a unique proxyId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1038745901


##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java:
##########
@@ -205,6 +206,32 @@ public CompletableFuture<Long> getMinOffset(ProxyContext ctx, AddressableMessage
         );
     }
 
+    @Override
+    public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        try {
+            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis);
+        } catch (Exception e) {
+            CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;

Review Comment:
   use `FutureUtils#completeExceptionally` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma merged pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
drpmma merged PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
ShadowySpirits commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1034471391


##########
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java:
##########
@@ -413,14 +417,48 @@ protected class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListen
 
         @Override
         public void handle(ConsumerGroupEvent event, String group, Object... args) {
-            if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
-                if (args == null || args.length < 1) {
+            switch (event) {
+                case CLIENT_UNREGISTER:
+                    processClientUnregister(group, args);
+                    break;
+                case REGISTER:
+                    processClientRegister(group, args);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        protected void processClientUnregister(String group, Object... args) {
+            if (args == null || args.length < 1) {
+                return;
+            }
+            if (args[0] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
                     return;
                 }
-                if (args[0] instanceof ClientChannelInfo) {
-                    ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
-                    grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
-                    grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+                GrpcClientChannel removedChannel = grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
+                log.info("remove grpc channel when client unregister. group:{}, clientChannelInfo:{}, removed:{}",
+                    group, clientChannelInfo, removedChannel != null);
+            }
+        }
+
+        protected void processClientRegister(String group, Object... args) {
+            if (args == null || args.length < 2) {
+                return;
+            }
+            if (args[1] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[1];
+                Channel channel = clientChannelInfo.getChannel();
+                if (ChannelHelper.isRemote(channel)) {
+                    // save settings from channel sync from other proxy
+                    Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);

Review Comment:
   This looks weird, settings are parsed from the channel but grpcClientSettingsManager is an argument to build Channel. 
   @xdkxlk Do we have a cleaner way to resolve this circular dependency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #5575: [ISSUE #5392] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#issuecomment-1326255693

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5575?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#5575](https://codecov.io/gh/apache/rocketmq/pull/5575?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7f53438) into [develop](https://codecov.io/gh/apache/rocketmq/commit/ee37e3a55fe21e0720cb10291bea65392626c775?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ee37e3a) will **decrease** coverage by `0.18%`.
   > The diff coverage is `30.98%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #5575      +/-   ##
   =============================================
   - Coverage      42.43%   42.25%   -0.19%     
   - Complexity      7936     8088     +152     
   =============================================
     Files           1022     1059      +37     
     Lines          71243    72613    +1370     
     Branches        9399     9500     +101     
   =============================================
   + Hits           30234    30679     +445     
   - Misses         37181    38073     +892     
   - Partials        3828     3861      +33     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5575?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...apache/rocketmq/broker/client/ProducerManager.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY2xpZW50L1Byb2R1Y2VyTWFuYWdlci5qYXZh) | `72.72% <ø> (ø)` | |
   | [...he/rocketmq/common/attribute/TopicMessageType.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vYXR0cmlidXRlL1RvcGljTWVzc2FnZVR5cGUuamF2YQ==) | `47.82% <0.00%> (-43.85%)` | :arrow_down: |
   | [...rg/apache/rocketmq/common/sysflag/PullSysFlag.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vc3lzZmxhZy9QdWxsU3lzRmxhZy5qYXZh) | `30.43% <0.00%> (-1.39%)` | :arrow_down: |
   | [...common/thread/FutureTaskExtThreadPoolExecutor.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vdGhyZWFkL0Z1dHVyZVRhc2tFeHRUaHJlYWRQb29sRXhlY3V0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ache/rocketmq/common/thread/ThreadPoolMonitor.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vdGhyZWFkL1RocmVhZFBvb2xNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...n/java/org/apache/rocketmq/proxy/ProxyStartup.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHJveHkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Byb3h5L1Byb3h5U3RhcnR1cC5qYXZh) | `43.96% <0.00%> (-0.78%)` | :arrow_down: |
   | [...pache/rocketmq/proxy/common/utils/FutureUtils.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHJveHkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Byb3h5L2NvbW1vbi91dGlscy9GdXR1cmVVdGlscy5qYXZh) | `63.63% <0.00%> (-23.87%)` | :arrow_down: |
   | [.../apache/rocketmq/proxy/grpc/GrpcServerBuilder.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHJveHkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Byb3h5L2dycGMvR3JwY1NlcnZlckJ1aWxkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...etmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHJveHkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Byb3h5L2dycGMvdjIvRGVmYXVsdEdycGNNZXNzaW5nQWN0aXZpdHkuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../rocketmq/proxy/grpc/v2/client/ClientActivity.java](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHJveHkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Byb3h5L2dycGMvdjIvY2xpZW50L0NsaWVudEFjdGl2aXR5LmphdmE=) | `62.44% <0.00%> (-5.76%)` | :arrow_down: |
   | ... and [110 more](https://codecov.io/gh/apache/rocketmq/pull/5575/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
xdkxlk commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1040564992


##########
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java:
##########
@@ -413,14 +417,48 @@ protected class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListen
 
         @Override
         public void handle(ConsumerGroupEvent event, String group, Object... args) {
-            if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
-                if (args == null || args.length < 1) {
+            switch (event) {
+                case CLIENT_UNREGISTER:
+                    processClientUnregister(group, args);
+                    break;
+                case REGISTER:
+                    processClientRegister(group, args);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        protected void processClientUnregister(String group, Object... args) {
+            if (args == null || args.length < 1) {
+                return;
+            }
+            if (args[0] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
                     return;
                 }
-                if (args[0] instanceof ClientChannelInfo) {
-                    ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
-                    grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
-                    grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+                GrpcClientChannel removedChannel = grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
+                log.info("remove grpc channel when client unregister. group:{}, clientChannelInfo:{}, removed:{}",
+                    group, clientChannelInfo, removedChannel != null);
+            }
+        }
+
+        protected void processClientRegister(String group, Object... args) {
+            if (args == null || args.length < 2) {
+                return;
+            }
+            if (args[1] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[1];
+                Channel channel = clientChannelInfo.getChannel();
+                if (ChannelHelper.isRemote(channel)) {
+                    // save settings from channel sync from other proxy
+                    Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);

Review Comment:
   The purpose is different. The parseChannelExtendAttribute method of GrpcClientChannel is a tool method to try to convert the attribute to the Settings. And the grpcClientSettingsManager is passed to channel in order to get extendAttribute of this channel in real time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5575: [ISSUE #5392] [RIP-55] Support remoting protocol in rocketmq proxy module

Posted by GitBox <gi...@apache.org>.
drpmma commented on code in PR #5575:
URL: https://github.com/apache/rocketmq/pull/5575#discussion_r1043281696


##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+
+public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
+
+    protected ThreadPoolExecutor threadPoolExecutor;
+    protected ConsumerManager consumerManager;
+    protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+
+    public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
+        ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+        super(topicRouteService, adminService, mqClientAPIFactory);
+        this.consumerManager = consumerManager;
+        this.init();
+    }
+
+    protected void init() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor(
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            1,
+            TimeUnit.MINUTES,
+            "HeartbeatSyncer",
+            proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity()
+        );
+        this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+            @Override
+            public void handle(ConsumerGroupEvent event, String s, Object... args) {
+                if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+                    if (args == null || args.length < 1) {
+                        return;
+                    }
+                    if (args[0] instanceof ClientChannelInfo) {
+                        ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                        remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
+                    }
+                }
+            }
+
+            @Override
+            public void shutdown() {
+
+            }
+        });
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        this.threadPoolExecutor.shutdown();
+        super.shutdown();
+    }
+
+    public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.REGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        consumeType,
+                        messageModel,
+                        consumeFromWhere,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode()
+                    );
+                    data.setSubscriptionDataSet(subList);
+
+                    log.debug("sync register heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                        consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+        }
+    }
+
+    public void onConsumerUnRegister(String consumerGroup, ClientChannelInfo clientChannelInfo) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.UNREGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        null,
+                        null,
+                        null,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode()
+                    );
+
+                    log.debug("sync unregister heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                        consumerGroup, clientChannelInfo, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                consumerGroup, clientChannelInfo, t);
+        }
+    }
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+        if (msgs == null || msgs.isEmpty()) {
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+
+        for (MessageExt msg : msgs) {
+            try {
+                HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
+                if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+                    continue;
+                }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org