You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:45 UTC
[rocketmq] 04/26: [ISSUE #5485] client connection management
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 56551b596b3f2f7b3e16e5f271200bceaf28734a
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Tue Nov 8 17:18:24 2022 +0800
[ISSUE #5485] client connection management
---
.../rocketmq/broker/client/ConsumerManager.java | 13 +-
.../broker/client/ConsumerManagerInterface.java | 60 ++++++
.../rocketmq/broker/client/ProducerManager.java | 10 +
.../broker/client/ProducerManagerInterface.java | 44 ++++
.../proxy/common/channel/ChannelHelper.java | 49 +++++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 49 +++++
.../proxy/grpc/v2/DefaultGrpcMessingActivity.java | 2 +-
.../proxy/grpc/v2/channel/GrpcChannelManager.java | 7 +-
.../proxy/grpc/v2/channel/GrpcClientChannel.java | 71 ++++++-
.../proxy/grpc/v2/client/ClientActivity.java | 76 ++++++-
.../grpc/v2/common/GrpcClientSettingsManager.java | 9 +
.../rocketmq/proxy/processor/ClientProcessor.java | 5 +
.../proxy/processor/DefaultMessagingProcessor.java | 5 +
.../proxy/processor/MessagingProcessor.java | 2 +
.../proxy/processor/ReceiptHandleProcessor.java | 5 +
.../channel/ChannelExtendAttributeGetter.java | 23 +++
.../processor/channel/ChannelProtocolType.java | 35 ++++
.../proxy/processor/channel/RemoteChannel.java | 98 +++++++++
.../processor/channel/RemoteChannelConverter.java | 23 +++
.../processor/channel/RemoteChannelSerializer.java | 65 ++++++
.../proxy/remoting/ClientHousekeepingService.java | 53 +++++
.../proxy/remoting/RemotingProtocolServer.java | 72 +++++++
.../proxy/remoting/RemotingProxyOutClient.java | 27 +++
.../remoting/activity/ClientManagerActivity.java | 178 ++++++++++++++++
.../proxy/remoting/channel/RemotingChannel.java | 224 +++++++++++++++++++++
.../remoting/channel/RemotingChannelManager.java | 142 +++++++++++++
.../proxy/remoting/common/RemotingConverter.java | 74 +++++++
.../proxy/service/ClusterServiceManager.java | 17 +-
.../proxy/service/LocalServiceManager.java | 11 +-
.../rocketmq/proxy/service/ServiceManager.java | 11 +-
.../rocketmq/proxy/service/admin/AdminService.java | 32 +++
.../proxy/service/admin/DefaultAdminService.java | 142 +++++++++++++
.../proxy/service/channel/SimpleChannel.java | 8 +
.../service/client/ClusterConsumerManager.java | 70 +++++++
.../sysmessage/AbstractSystemMessageSyncer.java | 190 +++++++++++++++++
.../proxy/service/sysmessage/HeartbeatSyncer.java | 224 +++++++++++++++++++++
.../service/sysmessage/HeartbeatSyncerData.java | 164 +++++++++++++++
.../proxy/service/sysmessage/HeartbeatType.java | 23 +++
.../mqclient/ProxyClientRemotingProcessorTest.java | 5 +-
39 files changed, 2285 insertions(+), 33 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 5f95ac1af..0582ce75e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -64,6 +64,7 @@ public class ConsumerManager {
this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
}
+ @Override
public ClientChannelInfo findChannel(final String group, final String clientId) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
@@ -72,6 +73,7 @@ public class ConsumerManager {
return null;
}
+ @Override
public ClientChannelInfo findChannel(final String group, final Channel channel) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
@@ -80,6 +82,7 @@ public class ConsumerManager {
return null;
}
+ @Override
public SubscriptionData findSubscriptionData(final String group, final String topic) {
return findSubscriptionData(group, topic, true);
}
@@ -107,6 +110,7 @@ public class ConsumerManager {
return this.consumerTable;
}
+ @Override
public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
return getConsumerGroupInfo(group, false);
}
@@ -119,6 +123,7 @@ public class ConsumerManager {
return consumerGroupInfo;
}
+ @Override
public int findSubscriptionDataCount(final String group) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
@@ -128,6 +133,7 @@ public class ConsumerManager {
return 0;
}
+ @Override
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -172,6 +178,7 @@ public class ConsumerManager {
isNotifyConsumerIdsChangedEnable, true);
}
+ @Override
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
@@ -202,11 +209,12 @@ public class ConsumerManager {
this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start));
}
- callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList);
+ callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo);
return r1 || r2;
}
+ @Override
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
@@ -252,6 +260,7 @@ public class ConsumerManager {
}
}
+ @Override
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
@@ -286,6 +295,7 @@ public class ConsumerManager {
removeExpireConsumerGroupInfo();
}
+ @Override
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -300,6 +310,7 @@ public class ConsumerManager {
return groups;
}
+ @Override
public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
consumerIdsChangeListenerList.add(listener);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
new file mode 100644
index 000000000..895a2e491
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import java.util.Set;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+public interface ConsumerManagerInterface {
+
+ ClientChannelInfo findChannel(String group, String clientId);
+
+ ClientChannelInfo findChannel(String group, Channel channel);
+
+ SubscriptionData findSubscriptionData(String group, String topic);
+
+ ConsumerGroupInfo getConsumerGroupInfo(String group);
+
+ int findSubscriptionDataCount(String group);
+
+ boolean doChannelCloseEvent(String remoteAddr, Channel channel);
+
+ default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+ return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
+ isNotifyConsumerIdsChangedEnable, true);
+ }
+
+ boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription);
+
+ void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
+ boolean isNotifyConsumerIdsChangedEnable);
+
+ void scanNotActiveChannel();
+
+ Set<String> queryTopicConsumeByWho(String topic);
+
+ void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener);
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 52d67bf28..047aa8be9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -54,10 +54,12 @@ public class ProducerManager {
this.brokerStatsManager = brokerStatsManager;
}
+ @Override
public int groupSize() {
return this.groupChannelTable.size();
}
+ @Override
public boolean groupOnline(String group) {
Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group);
return channels != null && !channels.isEmpty();
@@ -67,6 +69,7 @@ public class ProducerManager {
return groupChannelTable;
}
+ @Override
public ProducerTableInfo getProducerTable() {
Map<String, List<ProducerInfo>> map = new HashMap<>();
for (String group : this.groupChannelTable.keySet()) {
@@ -94,6 +97,7 @@ public class ProducerManager {
return new ProducerTableInfo(map);
}
+ @Override
public void scanNotActiveChannel() {
Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
@@ -129,6 +133,7 @@ public class ProducerManager {
}
}
+ @Override
public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
if (channel != null) {
@@ -160,6 +165,7 @@ public class ProducerManager {
return removed;
}
+ @Override
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
@@ -183,6 +189,7 @@ public class ProducerManager {
}
}
+ @Override
public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
@@ -202,6 +209,7 @@ public class ProducerManager {
}
}
+ @Override
public Channel getAvailableChannel(String groupId) {
if (groupId == null) {
return null;
@@ -242,6 +250,7 @@ public class ProducerManager {
return lastActiveChannel;
}
+ @Override
public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
@@ -257,6 +266,7 @@ public class ProducerManager {
}
}
+ @Override
public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) {
producerChangeListenerList.add(producerChangeListener);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
new file mode 100644
index 000000000..3f2ece7cd
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+
+public interface ProducerManagerInterface {
+
+ int groupSize();
+
+ boolean groupOnline(String group);
+
+ ProducerTableInfo getProducerTable();
+
+ void scanNotActiveChannel();
+
+ boolean doChannelCloseEvent(String remoteAddr, Channel channel);
+
+ void registerProducer(String group, ClientChannelInfo clientChannelInfo);
+
+ void unregisterProducer(String group, ClientChannelInfo clientChannelInfo);
+
+ Channel getAvailableChannel(String groupId);
+
+ Channel findChannel(String clientId);
+
+ void appendProducerChangeListener(ProducerChangeListener producerChangeListener);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java
new file mode 100644
index 000000000..dd15c85fb
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.channel;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+
+public class ChannelHelper {
+
+ /**
+ * judge channel is sync from other proxy or not
+ *
+ * @param channel channel
+ * @return true if is sync from other proxy
+ */
+ public static boolean isRemote(Channel channel) {
+ return channel instanceof RemoteChannel;
+ }
+
+ public static ChannelProtocolType getChannelProtocolType(Channel channel) {
+ if (channel instanceof GrpcClientChannel) {
+ return ChannelProtocolType.GRPC_V2;
+ } else if (channel instanceof RemotingChannel) {
+ return ChannelProtocolType.REMOTING;
+ } else if (channel instanceof RemoteChannel) {
+ RemoteChannel remoteChannel = (RemoteChannel) channel;
+ return remoteChannel.getType();
+ }
+ return ChannelProtocolType.UNKNOWN;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index cbedc3c50..0efca05b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.ProxyMode;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
public class ProxyConfig implements ConfigFile {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -55,6 +57,12 @@ public class ProxyConfig implements ConfigFile {
private String proxyClusterName = DEFAULT_CLUSTER_NAME;
private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName;
+ private String localServeAddr = "";
+
+ private String systemTopicClusterName = "";
+ private int heartbeatSyncerThreadPoolNums = 4;
+ private int heartbeatSyncerThreadPoolQueueCapacity = 100;
+
/**
* configuration for ThreadPoolMonitor
*/
@@ -204,6 +212,15 @@ public class ProxyConfig implements ConfigFile {
@Override
public void initData() {
parseDelayLevel();
+ if (StringUtils.isEmpty(localServeAddr)) {
+ this.localServeAddr = RemotingUtil.getLocalAddress();
+ }
+ if (StringUtils.isBlank(localServeAddr)) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "get local serve ip failed");
+ }
+ if (StringUtils.isBlank(systemTopicClusterName)) {
+ this.systemTopicClusterName = this.rocketMQClusterName;
+ }
}
public int computeDelayLevel(long timeMillis) {
@@ -267,6 +284,38 @@ public class ProxyConfig implements ConfigFile {
this.proxyName = proxyName;
}
+ public String getLocalServeAddr() {
+ return localServeAddr;
+ }
+
+ public void setLocalServeAddr(String localServeAddr) {
+ this.localServeAddr = localServeAddr;
+ }
+
+ public String getSystemTopicClusterName() {
+ return systemTopicClusterName;
+ }
+
+ public void setSystemTopicClusterName(String systemTopicClusterName) {
+ this.systemTopicClusterName = systemTopicClusterName;
+ }
+
+ public int getHeartbeatSyncerThreadPoolNums() {
+ return heartbeatSyncerThreadPoolNums;
+ }
+
+ public void setHeartbeatSyncerThreadPoolNums(int heartbeatSyncerThreadPoolNums) {
+ this.heartbeatSyncerThreadPoolNums = heartbeatSyncerThreadPoolNums;
+ }
+
+ public int getHeartbeatSyncerThreadPoolQueueCapacity() {
+ return heartbeatSyncerThreadPoolQueueCapacity;
+ }
+
+ public void setHeartbeatSyncerThreadPoolQueueCapacity(int heartbeatSyncerThreadPoolQueueCapacity) {
+ this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity;
+ }
+
public boolean isEnablePrintJstack() {
return enablePrintJstack;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index 81a819007..f30519d74 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -78,7 +78,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
protected void init(MessagingProcessor messagingProcessor) {
this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor);
- this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService());
+ this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(), this.grpcClientSettingsManager);
this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);
this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 97a0ae6da..fb6df2562 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -30,12 +30,14 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
public class GrpcChannelManager implements StartAndShutdown {
private final ProxyRelayService proxyRelayService;
+ private final GrpcClientSettingsManager grpcClientSettingsManager;
protected final ConcurrentMap<String, GrpcClientChannel> clientIdChannelMap = new ConcurrentHashMap<>();
protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
@@ -45,8 +47,9 @@ public class GrpcChannelManager implements StartAndShutdown {
new ThreadFactoryImpl("GrpcChannelManager_")
);
- public GrpcChannelManager(ProxyRelayService proxyRelayService) {
+ public GrpcChannelManager(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager) {
this.proxyRelayService = proxyRelayService;
+ this.grpcClientSettingsManager = grpcClientSettingsManager;
}
protected void init() {
@@ -58,7 +61,7 @@ public class GrpcChannelManager implements StartAndShutdown {
public GrpcClientChannel createChannel(ProxyContext ctx, String clientId) {
return this.clientIdChannelMap.computeIfAbsent(clientId,
- k -> new GrpcClientChannel(proxyRelayService, this, ctx, clientId));
+ k -> new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, this, ctx, clientId));
}
public GrpcClientChannel getChannel(String clientId) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index ec14473fd..714d0bf01 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -18,13 +18,17 @@ package org.apache.rocketmq.proxy.grpc.v2.channel;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
+import com.google.protobuf.util.JsonFormat;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,7 +37,14 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter;
import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -45,24 +56,70 @@ import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequest
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
-public class GrpcClientChannel extends ProxyChannel {
+public class GrpcClientChannel extends ProxyChannel implements ChannelExtendAttributeGetter, RemoteChannelConverter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final GrpcChannelManager grpcChannelManager;
+ private final GrpcClientSettingsManager grpcClientSettingsManager;
private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>();
private final Object telemetryWriteLock = new Object();
private final String clientId;
- public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcChannelManager grpcChannelManager,
- ProxyContext ctx, String clientId) {
+ public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager,
+ GrpcChannelManager grpcChannelManager, ProxyContext ctx, String clientId) {
super(proxyRelayService, null, new GrpcChannelId(clientId),
ctx.getRemoteAddress(),
ctx.getLocalAddress());
this.grpcChannelManager = grpcChannelManager;
+ this.grpcClientSettingsManager = grpcClientSettingsManager;
this.clientId = clientId;
}
+ @Override
+ public String getChannelExtendAttribute() {
+ Settings settings = this.grpcClientSettingsManager.getRawClientSettings(this.clientId);
+ if (settings == null) {
+ return null;
+ }
+ try {
+ return JsonFormat.printer().print(settings);
+ } catch (InvalidProtocolBufferException e) {
+ log.error("convert settings to json data failed. settings:{}", settings, e);
+ }
+ return null;
+ }
+
+ public static Settings parseChannelExtendAttribute(Channel channel) {
+ if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.GRPC_V2) &&
+ channel instanceof ChannelExtendAttributeGetter) {
+ String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute();
+ if (attr == null) {
+ return null;
+ }
+
+ Settings.Builder builder = Settings.newBuilder();
+ try {
+ JsonFormat.parser().merge(attr, builder);
+ return builder.build();
+ } catch (InvalidProtocolBufferException e) {
+ log.error("convert settings json data to settings failed. data:{}", attr, e);
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public RemoteChannel toRemoteChannel() {
+ return new RemoteChannel(
+ ConfigurationManager.getProxyConfig().getLocalServeAddr(),
+ this.getRemoteAddress(),
+ this.getLocalAddress(),
+ ChannelProtocolType.GRPC_V2,
+ this.getChannelExtendAttribute());
+ }
+
protected static class GrpcChannelId implements ChannelId {
private final String clientId;
@@ -181,14 +238,6 @@ public class GrpcClientChannel extends ProxyChannel {
return clientId;
}
- public String getRemoteAddress() {
- return remoteAddress;
- }
-
- public String getLocalAddress() {
- return localAddress;
- }
-
public void writeTelemetryCommand(TelemetryCommand command) {
StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get();
if (observer == null) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 20035f7a1..00cc862a4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -32,6 +32,7 @@ import apache.rocketmq.v2.ThreadStackTrace;
import apache.rocketmq.v2.VerifyMessageResult;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
+import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -39,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
@@ -49,6 +51,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
@@ -209,7 +212,8 @@ public class ClientActivity extends AbstractMessingActivity {
};
}
- protected void processTelemetryException(TelemetryCommand request, Throwable t, StreamObserver<TelemetryCommand> responseObserver) {
+ protected void processTelemetryException(TelemetryCommand request, Throwable t,
+ StreamObserver<TelemetryCommand> responseObserver) {
StatusRuntimeException exception = io.grpc.Status.INTERNAL
.withDescription("process client telemetryCommand failed. " + t.getMessage())
.withCause(t)
@@ -291,7 +295,8 @@ public class ClientActivity extends AbstractMessingActivity {
return channel;
}
- protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) {
+ protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType,
+ List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) {
String clientId = ctx.getClientID();
LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
@@ -413,14 +418,67 @@ public class ClientActivity extends AbstractMessingActivity {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
- if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
- if (args == null || args.length < 1) {
- return;
+ switch (event) {
+ case CLIENT_UNREGISTER:
+ processClientUnregister(group, args);
+ break;
+ case REGISTER:
+ processClientRegister(group, args);
+ break;
+ default:
+ break;
+ }
+ }
+
+ protected void processClientUnregister(String group, Object... args) {
+ if (args == null || args.length < 1) {
+ return;
+ }
+ if (args[0] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+ String clientId = clientChannelInfo.getClientId();
+ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+ grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
+ if (grpcChannelManager.getChannel(clientId) == null) {
+ // if there is no channel connect directly to this proxy
+ return null;
+ }
+ return orgSettings;
+ });
+ } else {
+ grpcChannelManager.removeChannel(clientId);
+ grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
+ ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group);
+ if (consumerGroupInfo == null) {
+ return null;
+ }
+ List<Channel> allChannels = consumerGroupInfo.getAllChannel();
+ if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) {
+ // if there is only one channel of this clientId or
+ // there is no channel if this clientId
+ // remove settings of this client
+ return null;
+ }
+ return orgSettings;
+ });
}
- if (args[0] instanceof ClientChannelInfo) {
- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
- grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
- grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+ }
+ }
+
+ protected void processClientRegister(String group, Object... args) {
+ if (args == null || args.length < 2) {
+ return;
+ }
+ if (args[1] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[1];
+ Channel channel = clientChannelInfo.getChannel();
+ if (ChannelHelper.isRemote(channel)) {
+ // save settings from channel sync from other proxy
+ Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);
+ if (settings == null) {
+ return;
+ }
+ grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), settings);
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index 8c3c3a8a4..21c3395ae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -52,6 +53,10 @@ public class GrpcClientSettingsManager {
this.messagingProcessor = messagingProcessor;
}
+ public Settings getRawClientSettings(String clientId) {
+ return CLIENT_SETTINGS_MAP.get(clientId);
+ }
+
public Settings getClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
Settings settings = CLIENT_SETTINGS_MAP.get(clientId);
@@ -184,6 +189,10 @@ public class GrpcClientSettingsManager {
CLIENT_SETTINGS_MAP.remove(clientId);
}
+ public void computeIfPresent(String clientId, Function<Settings, Settings> function) {
+ CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> function.apply(value));
+ }
+
public Settings removeAndGetClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
Settings settings = CLIENT_SETTINGS_MAP.remove(clientId);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 5408fa066..8fb6eaf7d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -101,6 +101,11 @@ public class ClientProcessor extends AbstractProcessor {
this.serviceManager.getConsumerManager().unregisterConsumer(consumerGroup, clientChannelInfo, false);
}
+ public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+ this.serviceManager.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.serviceManager.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+ }
+
public void registerConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 1b7baba0a..66239f0e8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -274,6 +274,11 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
this.clientProcessor.registerConsumerIdsChangeListener(consumerIdsChangeListener);
}
+ @Override
+ public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+ this.clientProcessor.doChannelCloseEvent(remoteAddr, channel);
+ }
+
@Override
public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
return this.clientProcessor.getConsumerGroupInfo(consumerGroup);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 3e8b8084e..3c4e6303f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -285,6 +285,8 @@ public interface MessagingProcessor extends StartAndShutdown {
ConsumerIdsChangeListener consumerIdsChangeListener
);
+ void doChannelCloseEvent(String remoteAddr, Channel channel);
+
ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup);
void addTransactionSubscription(
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index d435b0c2e..9d000bfe9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -105,6 +106,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
+ return;
+ }
clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java
new file mode 100644
index 000000000..3538a9496
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public interface ChannelExtendAttributeGetter {
+
+ String getChannelExtendAttribute();
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java
new file mode 100644
index 000000000..d2eeb8353
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public enum ChannelProtocolType {
+ UNKNOWN("unknown"),
+ GRPC_V2("grpc_v2"),
+ GRPC_V1("grpc_v1"),
+ REMOTING("remoting");
+
+ private final String name;
+
+ ChannelProtocolType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
new file mode 100644
index 000000000..5d9c9afcc
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+
+public class RemoteChannel extends SimpleChannel implements ChannelExtendAttributeGetter {
+ protected final ChannelProtocolType type;
+ protected final String remoteProxyIp;
+ protected volatile String extendAttribute;
+
+ public RemoteChannel(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type, String extendAttribute) {
+ super(null,
+ new RemoteChannelId(remoteProxyIp, remoteAddress, localAddress, type),
+ remoteAddress, localAddress);
+ this.type = type;
+ this.remoteProxyIp = remoteProxyIp;
+ this.extendAttribute = extendAttribute;
+ }
+
+ public static class RemoteChannelId implements ChannelId {
+
+ private final String id;
+
+ public RemoteChannelId(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type) {
+ this.id = remoteProxyIp + "@" + remoteAddress + "@" + localAddress + "@" + type;
+ }
+
+ @Override
+ public String asShortText() {
+ return this.id;
+ }
+
+ @Override
+ public String asLongText() {
+ return this.id;
+ }
+
+ @Override
+ public int compareTo(ChannelId o) {
+ return this.id.compareTo(o.asLongText());
+ }
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ public ChannelProtocolType getType() {
+ return type;
+ }
+
+ public String encode() {
+ return RemoteChannelSerializer.toJson(this);
+ }
+
+ public static RemoteChannel decode(String data) {
+ return RemoteChannelSerializer.decodeFromJson(data);
+ }
+
+ public static RemoteChannel create(Channel channel) {
+ if (channel instanceof RemoteChannelConverter) {
+ return ((RemoteChannelConverter) channel).toRemoteChannel();
+ }
+ return null;
+ }
+
+ public String getRemoteProxyIp() {
+ return remoteProxyIp;
+ }
+
+ public void setExtendAttribute(String extendAttribute) {
+ this.extendAttribute = extendAttribute;
+ }
+
+ @Override
+ public String getChannelExtendAttribute() {
+ return this.extendAttribute;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java
new file mode 100644
index 000000000..9f886e85d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public interface RemoteChannelConverter {
+
+ RemoteChannel toRemoteChannel();
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
new file mode 100644
index 000000000..8fd216219
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class RemoteChannelSerializer {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final String REMOTE_PROXY_IP_KEY = "remoteProxyIp";
+ private static final String REMOTE_ADDRESS_KEY = "remoteAddress";
+ private static final String LOCAL_ADDRESS_KEY = "localAddress";
+ private static final String TYPE_KEY = "type";
+ private static final String EXTEND_ATTRIBUTE_KEY = "extendAttribute";
+
+ public static String toJson(RemoteChannel remoteChannel) {
+ Map<String, Object> data = new HashMap<>();
+ data.put(REMOTE_PROXY_IP_KEY, remoteChannel.getRemoteProxyIp());
+ data.put(REMOTE_ADDRESS_KEY, remoteChannel.getRemoteAddress());
+ data.put(LOCAL_ADDRESS_KEY, remoteChannel.getLocalAddress());
+ data.put(TYPE_KEY, remoteChannel.getType());
+ data.put(EXTEND_ATTRIBUTE_KEY, remoteChannel.getChannelExtendAttribute());
+ return JSON.toJSONString(data);
+ }
+
+ public static RemoteChannel decodeFromJson(String jsonData) {
+ if (StringUtils.isBlank(jsonData)) {
+ return null;
+ }
+ try {
+ JSONObject jsonObject = JSON.parseObject(jsonData);
+ return new RemoteChannel(
+ jsonObject.getString(REMOTE_PROXY_IP_KEY),
+ jsonObject.getString(REMOTE_ADDRESS_KEY),
+ jsonObject.getString(LOCAL_ADDRESS_KEY),
+ jsonObject.getObject(TYPE_KEY, ChannelProtocolType.class),
+ jsonObject.getObject(EXTEND_ATTRIBUTE_KEY, String.class)
+ );
+ } catch (Throwable t) {
+ log.error("decode remote channel data failed. data:{}", jsonData, t);
+ }
+ return null;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
new file mode 100644
index 000000000..e213ae855
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class ClientHousekeepingService implements ChannelEventListener {
+
+ private final ClientManagerActivity clientManagerActivity;
+
+ public ClientHousekeepingService(ClientManagerActivity clientManagerActivity) {
+ this.clientManagerActivity = clientManagerActivity;
+ }
+
+ @Override
+ public void onChannelConnect(String remoteAddr, Channel channel) {
+
+ }
+
+ @Override
+ public void onChannelClose(String remoteAddr, Channel channel) {
+ this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+ }
+
+ @Override
+ public void onChannelException(String remoteAddr, Channel channel) {
+ this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+ }
+
+ @Override
+ public void onChannelIdle(String remoteAddr, Channel channel) {
+ this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+ }
+
+}
+
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
new file mode 100644
index 000000000..58b257641
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
+
+ private final MessagingProcessor messagingProcessor;
+ private RemotingServer defaultRemotingServer;
+
+ public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
+ this.messagingProcessor = messagingProcessor;
+ }
+
+ protected void init() {
+
+ }
+
+ protected void registerRemotingServer(RemotingServer remotingServer) {
+
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request,
+ long timeoutMillis) {
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ this.defaultRemotingServer.invokeAsync(channel, request, timeoutMillis, responseFuture -> {
+ if (responseFuture.getResponseCommand() == null) {
+ future.completeExceptionally(new MQClientException("response is null after send request to client", responseFuture.getCause()));
+ return;
+ }
+ future.complete(responseFuture.getResponseCommand());
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java
new file mode 100644
index 000000000..5a96c41c9
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface RemotingProxyOutClient {
+
+ CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request, long timeoutMillis);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
new file mode 100644
index 000000000..62400e033
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ProducerChangeListener;
+import org.apache.rocketmq.broker.client.ProducerGroupEvent;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ClientManagerActivity extends AbstractRemotingActivity {
+
+ private final RemotingChannelManager remotingChannelManager;
+
+ public ClientManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor,
+ RemotingChannelManager manager) {
+ super(requestPipeline, messagingProcessor);
+ this.remotingChannelManager = manager;
+ this.init();
+ }
+
+ protected void init() {
+ this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListenerImpl());
+ this.messagingProcessor.registerProducerListener(new ProducerChangeListenerImpl());
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ switch (request.getCode()) {
+ case RequestCode.HEART_BEAT:
+ return this.heartBeat(ctx, request, context);
+ case RequestCode.UNREGISTER_CLIENT:
+ return this.unregisterClient(ctx, request, context);
+ case RequestCode.CHECK_CLIENT_CONFIG:
+ return this.checkClientConfig(ctx, request, context);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) {
+ HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+ String clientId = heartbeatData.getClientID();
+
+ for (ProducerData data : heartbeatData.getProducerDataSet()) {
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ this.remotingChannelManager.createProducerChannel(ctx.channel(), data.getGroupName(), clientId),
+ clientId, request.getLanguage(),
+ request.getVersion());
+ messagingProcessor.registerProducer(context, data.getGroupName(), clientChannelInfo);
+ }
+
+ for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ this.remotingChannelManager.createConsumerChannel(ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
+ clientId, request.getLanguage(),
+ request.getVersion());
+ messagingProcessor.registerConsumer(context, data.getGroupName(), clientChannelInfo, data.getConsumeType(),
+ data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), true);
+ }
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("");
+ return response;
+ }
+
+ protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+ final UnregisterClientRequestHeader requestHeader =
+ (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+ final String producerGroup = requestHeader.getProducerGroup();
+ if (producerGroup != null) {
+ RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel());
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ channel,
+ requestHeader.getClientID(),
+ request.getLanguage(),
+ request.getVersion());
+ this.messagingProcessor.unRegisterProducer(context, producerGroup, clientChannelInfo);
+ }
+ final String consumerGroup = requestHeader.getConsumerGroup();
+ if (consumerGroup != null) {
+ RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel());
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ channel,
+ requestHeader.getClientID(),
+ request.getLanguage(),
+ request.getVersion());
+ this.messagingProcessor.unRegisterConsumer(context, consumerGroup, clientChannelInfo);
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("");
+ return response;
+ }
+
+ protected RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("");
+ return response;
+ }
+
+ public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+ Set<RemotingChannel> remotingChannelSet = this.remotingChannelManager.removeChannel(channel);
+ for (RemotingChannel remotingChannel : remotingChannelSet) {
+ this.messagingProcessor.doChannelCloseEvent(remoteAddr, remotingChannel);
+ }
+ }
+
+ protected class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
+
+ @Override
+ public void handle(ConsumerGroupEvent event, String group, Object... args) {
+ if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+ if (args == null || args.length < 1) {
+ return;
+ }
+ if (args[0] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+ remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel());
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+ }
+
+ protected class ProducerChangeListenerImpl implements ProducerChangeListener {
+
+ @Override
+ public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) {
+ if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
+ remotingChannelManager.removeProducerChannel(group, clientChannelInfo.getChannel());
+ }
+ }
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
new file mode 100644
index 000000000..2b2cfca79
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.remoting.common.RemotingConverter;
+import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.transaction.TransactionData;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RemotingChannel extends ProxyChannel implements RemoteChannelConverter, ChannelExtendAttributeGetter {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis();
+ private final String clientId;
+ private final String remoteAddress;
+ private final String localAddress;
+ private final RemotingProxyOutClient remotingProxyOutClient;
+ private final Set<SubscriptionData> subscriptionData;
+
+ public RemotingChannel(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService,
+ Channel parent,
+ String clientId, Set<SubscriptionData> subscriptionData) {
+ super(proxyRelayService, parent, parent.id(),
+ RemotingUtil.socketAddress2String(parent.remoteAddress()),
+ RemotingUtil.socketAddress2String(parent.localAddress()));
+ this.remotingProxyOutClient = remotingProxyOutClient;
+ this.clientId = clientId;
+ this.remoteAddress = RemotingUtil.socketAddress2String(parent.remoteAddress());
+ this.localAddress = RemotingUtil.socketAddress2String(parent.localAddress());
+ this.subscriptionData = subscriptionData;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.parent().isOpen();
+ }
+
+ @Override
+ public boolean isActive() {
+ return this.parent().isActive();
+ }
+
+ @Override
+ public boolean isWritable() {
+ return this.parent().isWritable();
+ }
+
+ @Override
+ protected CompletableFuture<Void> processOtherMessage(Object msg) {
+ this.parent().writeAndFlush(msg);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ protected CompletableFuture<Void> processCheckTransaction(CheckTransactionStateRequestHeader header,
+ MessageExt messageExt, TransactionData transactionData,
+ CompletableFuture<ProxyRelayResult<Void>> responseFuture) {
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+ try {
+ CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
+ requestHeader.setCommitLogOffset(transactionData.getCommitLogOffset());
+ requestHeader.setTranStateTableOffset(transactionData.getTranStateTableOffset());
+ requestHeader.setTransactionId(transactionData.getTransactionId());
+ requestHeader.setMsgId(header.getMsgId());
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+ request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
+
+ this.parent().writeAndFlush(request).addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ responseFuture.complete(null);
+ writeFuture.complete(null);
+ } else {
+ Exception e = new RemotingException("write and flush data failed");
+ responseFuture.completeExceptionally(e);
+ writeFuture.completeExceptionally(e);
+ }
+ });
+ } catch (Throwable t) {
+ responseFuture.completeExceptionally(t);
+ writeFuture.completeExceptionally(t);
+ }
+ return writeFuture;
+ }
+
+ @Override
+ protected CompletableFuture<Void> processGetConsumerRunningInfo(RemotingCommand command,
+ GetConsumerRunningInfoRequestHeader header,
+ CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> responseFuture) {
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header);
+ return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT)
+ .thenApply(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
+ responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
+ return null;
+ }
+ String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
+ throw e;
+ });
+ } catch (Throwable t) {
+ responseFuture.completeExceptionally(t);
+ writeFuture.completeExceptionally(t);
+ }
+ return writeFuture;
+ }
+
+ @Override
+ protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command,
+ ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt,
+ CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) {
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, header);
+ request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
+
+ return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT)
+ .thenApply(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class);
+ responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
+ return null;
+ }
+ String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
+ throw e;
+ });
+ } catch (Throwable t) {
+ responseFuture.completeExceptionally(t);
+ writeFuture.completeExceptionally(t);
+ }
+ return writeFuture;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ @Override
+ public String getChannelExtendAttribute() {
+ if (this.subscriptionData == null) {
+ return null;
+ }
+ return JSON.toJSONString(this.subscriptionData);
+ }
+
+ public static Set<SubscriptionData> parseChannelExtendAttribute(Channel channel) {
+ if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.REMOTING) &&
+ channel instanceof ChannelExtendAttributeGetter) {
+ String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute();
+ if (attr == null) {
+ return null;
+ }
+
+ try {
+ return JSON.parseObject(attr, new TypeReference<Set<SubscriptionData>>() {
+ });
+ } catch (Exception e) {
+ log.error("convert remoting extend attribute to subscriptionDataSet failed. data:{}", attr, e);
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public RemoteChannel toRemoteChannel() {
+ return new RemoteChannel(
+ ConfigurationManager.getProxyConfig().getLocalServeAddr(),
+ this.getRemoteAddress(),
+ this.getLocalAddress(),
+ ChannelProtocolType.REMOTING,
+ this.getChannelExtendAttribute());
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
new file mode 100644
index 000000000..bdc6457e7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemotingChannelManager implements StartAndShutdown {
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private final ProxyRelayService proxyRelayService;
+ protected final ConcurrentMap<String /* group */, Map<Channel /* raw channel */, RemotingChannel>> groupChannelMap = new ConcurrentHashMap<>();
+
+ private final RemotingProxyOutClient remotingProxyOutClient;
+
+ public RemotingChannelManager(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService) {
+ this.remotingProxyOutClient = remotingProxyOutClient;
+ this.proxyRelayService = proxyRelayService;
+ }
+
+ protected String buildProducerKey(String group) {
+ return buildKey("p", group);
+ }
+
+ protected String buildConsumerKey(String group) {
+ return buildKey("c", group);
+ }
+
+ protected String buildKey(String prefix, String group) {
+ return prefix + group;
+ }
+
+ protected String getGroupFromKey(String key) {
+ return key.substring(1);
+ }
+
+ public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) {
+ return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet());
+ }
+
+ public RemotingChannel createConsumerChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
+ return createChannel(channel, buildConsumerKey(group), clientId, subscriptionData);
+ }
+
+ protected RemotingChannel createChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
+ this.groupChannelMap.compute(group, (groupKey, clientIdMap) -> {
+ if (clientIdMap == null) {
+ clientIdMap = new ConcurrentHashMap<>();
+ }
+ clientIdMap.computeIfAbsent(channel, clientIdKey -> new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionData));
+ return clientIdMap;
+ });
+ return getChannel(group, channel);
+ }
+
+ public RemotingChannel getConsumerChannel(String group, Channel channel) {
+ return getChannel(buildConsumerKey(group), channel);
+ }
+
+ public RemotingChannel getProducerChannel(String group, Channel channel) {
+ return getChannel(buildProducerKey(group), channel);
+ }
+
+ protected RemotingChannel getChannel(String group, Channel channel) {
+ Map<Channel, RemotingChannel> clientIdChannelMap = this.groupChannelMap.get(group);
+ if (clientIdChannelMap == null) {
+ return null;
+ }
+ return clientIdChannelMap.get(channel);
+ }
+
+ public Set<RemotingChannel> removeChannel(Channel channel) {
+ Set<RemotingChannel> removedChannelSet = new HashSet<>();
+ for (Map.Entry<String, Map<Channel /* raw channel */, RemotingChannel>> entry : groupChannelMap.entrySet()) {
+ Map<Channel /* raw channel */, RemotingChannel> channelMap = entry.getValue();
+
+ RemotingChannel remotingChannel = channelMap.remove(channel);
+ if (remotingChannel != null) {
+ removedChannelSet.add(remotingChannel);
+ }
+ }
+ return removedChannelSet;
+ }
+
+ public RemotingChannel removeProducerChannel(String group, Channel channel) {
+ return removeChannel(buildProducerKey(group), channel);
+ }
+
+ public RemotingChannel removeConsumerChannel(String group, Channel channel) {
+ return removeChannel(buildConsumerKey(group), channel);
+ }
+
+ protected RemotingChannel removeChannel(String group, Channel channel) {
+ AtomicReference<RemotingChannel> channelRef = new AtomicReference<>();
+
+ this.groupChannelMap.computeIfPresent(group, (groupKey, channelMap) -> {
+ channelRef.set(channelMap.remove(channel));
+ if (channelMap.isEmpty()) {
+ return null;
+ }
+ return channelMap;
+ });
+ return channelRef.get();
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
new file mode 100644
index 000000000..55af1ff19
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.common;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class RemotingConverter {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
+ protected static final Object INSTANCE_CREATE_LOCK = new Object();
+ protected static volatile RemotingConverter instance;
+
+ public static RemotingConverter getInstance() {
+ if (instance == null) {
+ synchronized (INSTANCE_CREATE_LOCK) {
+ if (instance == null) {
+ instance = new RemotingConverter();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public byte[] convertMsgToBytes(List<MessageExt> msgList) {
+ // set response body
+ byte[][] msgBufferList = new byte[msgList.size()][];
+ int bodyTotalSize = 0;
+ for (int i = 0; i < msgList.size(); i++) {
+ try {
+ msgBufferList[i] = convertMsgToBytes(msgList.get(i));
+ bodyTotalSize += msgBufferList[i].length;
+ } catch (Exception e) {
+ log.error("messageToByteBuffer UnsupportedEncodingException", e);
+ }
+ }
+
+ ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
+ for (byte[] bb : msgBufferList) {
+ body.put(bb);
+ }
+
+ return body.array();
+ }
+
+ public byte[] convertMsgToBytes(final MessageExt msg) throws Exception {
+ // change to 0 for recalculate storeSize
+ msg.setStoreSize(0);
+ if (msg.getTopic().length() > Byte.MAX_VALUE) {
+ log.warn("Topic length is too long, topic: {}", msg.getTopic());
+ }
+ return MessageDecoder.encode(msg, false);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index ac1ff6a88..24b27aaa2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -32,6 +32,9 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
+import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
import org.apache.rocketmq.proxy.service.message.MessageService;
import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
@@ -52,11 +55,12 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
protected ClusterTransactionService clusterTransactionService;
protected ProducerManager producerManager;
- protected ConsumerManager consumerManager;
+ protected ClusterConsumerManager consumerManager;
protected TopicRouteService topicRouteService;
protected MessageService messageService;
protected ProxyRelayService proxyRelayService;
protected ClusterMetadataService metadataService;
+ protected AdminService adminService;
protected ScheduledExecutorService scheduledExecutorService;
protected MQClientAPIFactory messagingClientAPIFactory;
@@ -67,7 +71,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
this.producerManager = new ProducerManager();
- this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
+ this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
this.messagingClientAPIFactory = new MQClientAPIFactory(
"ClusterMQClient_",
@@ -76,7 +80,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
rpcHook,
scheduledExecutorService);
this.operationClientAPIFactory = new MQClientAPIFactory(
- "TopicRouteServiceClient_",
+ "OperationClient_",
1,
new DoNothingClientRemotingProcessor(null),
rpcHook,
@@ -95,6 +99,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
this.transactionClientAPIFactory);
this.proxyRelayService = new ClusterProxyRelayService(this.clusterTransactionService);
this.metadataService = new ClusterMetadataService(topicRouteService, operationClientAPIFactory);
+ this.adminService = new DefaultAdminService(this.operationClientAPIFactory);
this.init();
}
@@ -118,6 +123,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
this.appendStartAndShutdown(this.topicRouteService);
this.appendStartAndShutdown(this.clusterTransactionService);
this.appendStartAndShutdown(this.metadataService);
+ this.appendStartAndShutdown(this.consumerManager);
}
@Override
@@ -155,6 +161,11 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
return this.metadataService;
}
+ @Override
+ public AdminService getAdminService() {
+ return this.adminService;
+ }
+
protected static class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
@Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
index 6afc86c57..4f829caa6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
@@ -25,6 +25,8 @@ import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
import org.apache.rocketmq.proxy.service.channel.ChannelManager;
import org.apache.rocketmq.proxy.service.message.LocalMessageService;
import org.apache.rocketmq.proxy.service.message.MessageService;
@@ -48,6 +50,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
private final TransactionService transactionService;
private final ProxyRelayService proxyRelayService;
private final MetadataService metadataService;
+ private final AdminService adminService;
private final MQClientAPIFactory mqClientAPIFactory;
private final ChannelManager channelManager;
@@ -60,7 +63,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
this.channelManager = new ChannelManager();
this.messageService = new LocalMessageService(brokerController, channelManager, rpcHook);
this.mqClientAPIFactory = new MQClientAPIFactory(
- "TopicRouteServiceClient_",
+ "LocalMQClient_",
1,
new DoNothingClientRemotingProcessor(null),
rpcHook,
@@ -70,6 +73,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
this.transactionService = new LocalTransactionService(brokerController.getBrokerConfig());
this.proxyRelayService = new LocalProxyRelayService(brokerController, this.transactionService);
this.metadataService = new LocalMetadataService(brokerController);
+ this.adminService = new DefaultAdminService(this.mqClientAPIFactory);
this.init();
}
@@ -114,6 +118,11 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
return this.metadataService;
}
+ @Override
+ public AdminService getAdminService() {
+ return this.adminService;
+ }
+
private class LocalServiceManagerStartAndShutdown implements StartAndShutdown {
@Override
public void start() throws Exception {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
index 563b56715..ce84832ca 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
@@ -16,9 +16,10 @@
*/
package org.apache.rocketmq.proxy.service;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ProducerManagerInterface;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.message.MessageService;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -30,13 +31,15 @@ public interface ServiceManager extends StartAndShutdown {
TopicRouteService getTopicRouteService();
- ProducerManager getProducerManager();
+ ProducerManagerInterface getProducerManager();
- ConsumerManager getConsumerManager();
+ ConsumerManagerInterface getConsumerManager();
TransactionService getTransactionService();
ProxyRelayService getProxyRelayService();
MetadataService getMetadataService();
+
+ AdminService getAdminService();
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
new file mode 100644
index 000000000..d98d17ff7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.util.List;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+
+public interface AdminService {
+
+ boolean topicExist(String topic);
+
+ boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum,
+ int rQueueNum, boolean examineTopic, int retryCheckCount);
+
+ boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList,
+ List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception;
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
new file mode 100644
index 000000000..4f3b407d6
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteHelper;
+
+public class DefaultAdminService implements AdminService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private final MQClientAPIFactory mqClientAPIFactory;
+
+ public DefaultAdminService(MQClientAPIFactory mqClientAPIFactory) {
+ this.mqClientAPIFactory = mqClientAPIFactory;
+ }
+
+ @Override
+ public boolean topicExist(String topic) {
+ boolean topicExist;
+ TopicRouteData topicRouteData;
+ try {
+ topicRouteData = this.getTopicRouteDataDirectlyFromNameServer(topic);
+ topicExist = topicRouteData != null;
+ } catch (Throwable e) {
+ topicExist = false;
+ }
+
+ return topicExist;
+ }
+
+ @Override
+ public boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum,
+ int rQueueNum, boolean examineTopic, int retryCheckCount) {
+ TopicRouteData curTopicRouteData = new TopicRouteData();
+ try {
+ curTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(createTopic);
+ } catch (Exception e) {
+ if (!TopicRouteHelper.isTopicNotExistError(e)) {
+ log.error("get cur topic route {} failed.", createTopic, e);
+ return false;
+ }
+ }
+
+ TopicRouteData sampleTopicRouteData = null;
+ try {
+ sampleTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(sampleTopic);
+ } catch (Exception e) {
+ log.error("create topic {} failed.", createTopic, e);
+ return false;
+ }
+
+ if (sampleTopicRouteData == null || sampleTopicRouteData.getBrokerDatas().isEmpty()) {
+ return false;
+ }
+
+ try {
+ return this.createTopicOnBroker(createTopic, wQueueNum, rQueueNum, curTopicRouteData.getBrokerDatas(),
+ sampleTopicRouteData.getBrokerDatas(), examineTopic, retryCheckCount);
+ } catch (Exception e) {
+ log.error("create topic {} failed.", createTopic, e);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList,
+ List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception {
+ Set<String> curBrokerAddr = new HashSet<>();
+ if (curBrokerDataList != null) {
+ for (BrokerData brokerData : curBrokerDataList) {
+ curBrokerAddr.add(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID));
+ }
+ }
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setWriteQueueNums(wQueueNum);
+ topicConfig.setReadQueueNums(rQueueNum);
+ topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+
+ for (BrokerData brokerData : sampleBrokerDataList) {
+ String addr = brokerData.getBrokerAddrs() == null ? null : brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (addr == null) {
+ continue;
+ }
+ if (curBrokerAddr.contains(addr)) {
+ continue;
+ }
+
+ this.getClient().createTopic(addr, TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, Duration.ofSeconds(3).toMillis());
+ }
+
+ if (examineTopic) {
+ // examine topic exist.
+ int count = retryCheckCount;
+ while (count-- > 0) {
+ if (this.topicExist(topic)) {
+ return true;
+ }
+ }
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ protected TopicRouteData getTopicRouteDataDirectlyFromNameServer(String topic) throws Exception {
+ return this.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
+ }
+
+ protected MQClientAPIExt getClient() {
+ return this.mqClientAPIFactory.getClient();
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
index 04ad5e269..65c1fd406 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
@@ -196,6 +196,14 @@ public class SimpleChannel extends AbstractChannel {
}
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public String getLocalAddress() {
+ return localAddress;
+ }
+
public ChannelHandlerContext getChannelHandlerContext() {
return channelHandlerContext;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
new file mode 100644
index 000000000..3bb65b03e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.client;
+
+import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+
+public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown {
+
+ protected HeartbeatSyncer heartbeatSyncer;
+
+ public ClusterConsumerManager(TopicRouteService topicRouteService, AdminService adminService,
+ MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener consumerIdsChangeListener, long channelExpiredTimeout) {
+ super(consumerIdsChangeListener, channelExpiredTimeout);
+ this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, this, mqClientAPIFactory);
+ }
+
+ @Override
+ public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
+ this.heartbeatSyncer.onConsumerRegister(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList);
+ return super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
+ isNotifyConsumerIdsChangedEnable, updateSubscription);
+ }
+
+ @Override
+ public void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
+ boolean isNotifyConsumerIdsChangedEnable) {
+ this.heartbeatSyncer.onConsumerUnRegister(group, clientChannelInfo);
+ super.unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ this.heartbeatSyncer.shutdown();
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.heartbeatSyncer.start();
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
new file mode 100644
index 000000000..72593525e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently {
+ protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ protected final TopicRouteService topicRouteService;
+ protected final AdminService adminService;
+ protected final String systemResourceName;
+ protected final MQClientAPIFactory mqClientAPIFactory;
+ protected DefaultMQPushConsumer defaultMQPushConsumer;
+
+ public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mqClientAPIFactory) {
+ this.topicRouteService = topicRouteService;
+ this.adminService = adminService;
+ this.mqClientAPIFactory = mqClientAPIFactory;
+
+ this.systemResourceName = this.getSystemResourceName();
+ }
+
+ protected String getSystemResourceName() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName();
+ }
+
+ protected String getSystemMessageProducerId() {
+ return "PID_" + this.systemResourceName;
+ }
+
+ protected String getSystemMessageConsumerId() {
+ return "CID_" + this.systemResourceName;
+ }
+
+ protected String getBroadcastTopicName() {
+ return this.systemResourceName;
+ }
+
+ protected String getSubTag() {
+ return "*";
+ }
+
+ protected String getBroadcastTopicClusterName() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ return proxyConfig.getSystemTopicClusterName();
+ }
+
+ protected int getBroadcastTopicQueueNum() {
+ return 1;
+ }
+
+ protected RPCHook getRpcHook() {
+ return null;
+ }
+
+ protected void sendSystemMessage(Object data) {
+ String targetTopic = this.getBroadcastTopicName();
+ try {
+ Message message = new Message(
+ targetTopic,
+ JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8)
+ );
+
+ AddressableMessageQueue messageQueue = this.topicRouteService.getAllMessageQueueView(targetTopic)
+ .getWriteSelector().selectOne(true);
+ this.mqClientAPIFactory.getClient().sendMessageAsync(
+ messageQueue.getBrokerAddr(),
+ messageQueue.getBrokerName(),
+ message,
+ buildSendMessageRequestHeader(message, this.getSystemMessageProducerId(), messageQueue.getQueueId()),
+ Duration.ofSeconds(3).toMillis()
+ ).whenCompleteAsync((result, throwable) -> {
+ if (throwable != null) {
+ log.error("send system message failed. data: {}, topic: {}", data, getBroadcastTopicName(), throwable);
+ return;
+ }
+ if (SendStatus.SEND_OK != result.getSendStatus()) {
+ log.error("send system message failed. data: {}, topic: {}, sendResult:{}", data, getBroadcastTopicName(), result);
+ }
+ });
+ } catch (Throwable t) {
+ log.error("send system message failed. data: {}, topic: {}", data, targetTopic, t);
+ }
+ }
+
+ protected SendMessageRequestHeader buildSendMessageRequestHeader(Message message,
+ String producerGroup, int queueId) {
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+
+ requestHeader.setProducerGroup(producerGroup);
+ requestHeader.setTopic(message.getTopic());
+ requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(0);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(message.getFlag());
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
+ requestHeader.setReconsumeTimes(0);
+ requestHeader.setBatch(false);
+ return requestHeader;
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.createSysTopic();
+ RPCHook rpcHook = this.getRpcHook();
+ this.defaultMQPushConsumer = new DefaultMQPushConsumer(null, this.getSystemMessageConsumerId(), rpcHook);
+
+ this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+ try {
+ this.defaultMQPushConsumer.subscribe(this.getBroadcastTopicName(), this.getSubTag());
+ } catch (MQClientException e) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "subscribe to broadcast topic " + this.getBroadcastTopicName() + " failed. " + e.getMessage());
+ }
+ this.defaultMQPushConsumer.registerMessageListener(this);
+ this.defaultMQPushConsumer.start();
+ }
+
+ protected void createSysTopic() {
+ if (this.adminService.topicExist(this.getBroadcastTopicName())) {
+ return;
+ }
+
+ String clusterName = this.getBroadcastTopicClusterName();
+ if (StringUtils.isEmpty(clusterName)) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "system topic cluster cannot be empty");
+ }
+
+ boolean createSuccess = this.adminService.createTopicOnTopicBrokerIfNotExist(
+ this.getBroadcastTopicName(),
+ clusterName,
+ this.getBroadcastTopicQueueNum(),
+ this.getBroadcastTopicQueueNum(),
+ true,
+ 3
+ );
+ if (!createSuccess) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "create system broadcast topic " + this.getBroadcastTopicName() + " failed on cluster " + clusterName);
+ }
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ this.defaultMQPushConsumer.shutdown();
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
new file mode 100644
index 000000000..ce3403766
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+
+public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
+
+ protected ThreadPoolExecutor threadPoolExecutor;
+ protected ConsumerManagerInterface consumerManager;
+ protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+
+ public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
+ ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+ super(topicRouteService, adminService, mqClientAPIFactory);
+ this.consumerManager = consumerManager;
+ this.init();
+ }
+
+ protected void init() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor(
+ proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+ proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+ 1,
+ TimeUnit.MINUTES,
+ "HeartbeatSyncer",
+ proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity()
+ );
+ this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+ @Override
+ public void handle(ConsumerGroupEvent event, String s, Object... args) {
+ if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+ if (args == null || args.length < 1) {
+ return;
+ }
+ if (args[0] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+ remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+ });
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ this.threadPoolExecutor.shutdown();
+ super.shutdown();
+ }
+
+ public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ Set<SubscriptionData> subList) {
+ if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+ return;
+ }
+ try {
+ this.threadPoolExecutor.submit(() -> {
+ try {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+ if (remoteChannel == null) {
+ return;
+ }
+ HeartbeatSyncerData data = new HeartbeatSyncerData(
+ HeartbeatType.REGISTER,
+ clientChannelInfo.getClientId(),
+ clientChannelInfo.getLanguage(),
+ clientChannelInfo.getVersion(),
+ consumerGroup,
+ consumeType,
+ messageModel,
+ consumeFromWhere,
+ proxyConfig.getLocalServeAddr(),
+ remoteChannel.encode(),
+ remoteChannel.getChannelExtendAttribute()
+ );
+ data.setSubscriptionDataSet(subList);
+
+ this.sendSystemMessage(data);
+ } catch (Throwable t) {
+ log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+ consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+ }
+ });
+ } catch (Throwable t) {
+ log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+ consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+ }
+ }
+
+ public void onConsumerUnRegister(String consumerGroup, ClientChannelInfo clientChannelInfo) {
+ if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+ return;
+ }
+ try {
+ this.threadPoolExecutor.submit(() -> {
+ try {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+ if (remoteChannel == null) {
+ return;
+ }
+ HeartbeatSyncerData data = new HeartbeatSyncerData(
+ HeartbeatType.UNREGISTER,
+ clientChannelInfo.getClientId(),
+ clientChannelInfo.getLanguage(),
+ clientChannelInfo.getVersion(),
+ consumerGroup,
+ null,
+ null,
+ null,
+ proxyConfig.getLocalServeAddr(),
+ remoteChannel.encode(),
+ remoteChannel.getChannelExtendAttribute()
+ );
+
+ this.sendSystemMessage(data);
+ } catch (Throwable t) {
+ log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+ consumerGroup, clientChannelInfo, t);
+ }
+ });
+ } catch (Throwable t) {
+ log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+ consumerGroup, clientChannelInfo, t);
+ }
+ }
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ if (msgs == null || msgs.isEmpty()) {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ for (MessageExt msg : msgs) {
+ try {
+ HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
+ if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+ continue;
+ }
+
+ RemoteChannel channel = RemoteChannel.decode(data.getChannelData());
+ RemoteChannel finalChannel = channel;
+ channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel);
+ channel.setExtendAttribute(data.getChannelExtendAttribute());
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ channel,
+ data.getClientId(),
+ data.getLanguage(),
+ data.getVersion()
+ );
+ if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) {
+ this.consumerManager.registerConsumer(
+ data.getGroup(),
+ clientChannelInfo,
+ data.getConsumeType(),
+ data.getMessageModel(),
+ data.getConsumeFromWhere(),
+ data.getSubscriptionDataSet(),
+ false
+ );
+ } else {
+ this.consumerManager.unregisterConsumer(
+ data.getGroup(),
+ clientChannelInfo,
+ false
+ );
+ }
+ } catch (Throwable t) {
+ log.error("heartbeat consume message failed. msg:{}, data:{}", msg, new String(msg.getBody(), StandardCharsets.UTF_8), t);
+ }
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
new file mode 100644
index 000000000..20fee7aac
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import java.util.Set;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+public class HeartbeatSyncerData {
+ private HeartbeatType heartbeatType;
+ private String clientId;
+ private LanguageCode language;
+ private int version;
+ private long lastUpdateTimestamp = System.currentTimeMillis();
+ private Set<SubscriptionData> subscriptionDataSet;
+ private String group;
+ private ConsumeType consumeType;
+ private MessageModel messageModel;
+ private ConsumeFromWhere consumeFromWhere;
+ private String connectProxyIp;
+ private String channelData;
+ private String channelExtendAttribute;
+
+ public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
+ LanguageCode language, int version, String group,
+ ConsumeType consumeType, MessageModel messageModel,
+ ConsumeFromWhere consumeFromWhere, String connectProxyIp,
+ String channelData, String channelExtendAttribute) {
+ this.heartbeatType = heartbeatType;
+ this.clientId = clientId;
+ this.language = language;
+ this.version = version;
+ this.group = group;
+ this.consumeType = consumeType;
+ this.messageModel = messageModel;
+ this.consumeFromWhere = consumeFromWhere;
+ this.connectProxyIp = connectProxyIp;
+ this.channelData = channelData;
+ this.channelExtendAttribute = channelExtendAttribute;
+ }
+
+ public HeartbeatType getHeartbeatType() {
+ return heartbeatType;
+ }
+
+ public void setHeartbeatType(HeartbeatType heartbeatType) {
+ this.heartbeatType = heartbeatType;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(LanguageCode language) {
+ this.language = language;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public Set<SubscriptionData> getSubscriptionDataSet() {
+ return subscriptionDataSet;
+ }
+
+ public void setSubscriptionDataSet(
+ Set<SubscriptionData> subscriptionDataSet) {
+ this.subscriptionDataSet = subscriptionDataSet;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public String getConnectProxyIp() {
+ return connectProxyIp;
+ }
+
+ public void setConnectProxyIp(String connectProxyIp) {
+ this.connectProxyIp = connectProxyIp;
+ }
+
+ public String getChannelData() {
+ return channelData;
+ }
+
+ public void setChannelData(String channelData) {
+ this.channelData = channelData;
+ }
+
+ public String getChannelExtendAttribute() {
+ return channelExtendAttribute;
+ }
+
+ public void setChannelExtendAttribute(String channelExtendAttribute) {
+ this.channelExtendAttribute = channelExtendAttribute;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java
new file mode 100644
index 000000000..8f0801f54
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+public enum HeartbeatType {
+ REGISTER,
+ UNREGISTER;
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index 3a50d842f..eb90b9205 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -64,6 +65,8 @@ public class ProxyClientRemotingProcessorTest {
@Mock
private ProducerManager producerManager;
@Mock
+ private GrpcClientSettingsManager grpcClientSettingsManager;
+ @Mock
private ProxyRelayService proxyRelayService;
@Test
@@ -74,7 +77,7 @@ public class ProxyClientRemotingProcessorTest {
new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000),
proxyRelayResultFuture));
- GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null,
+ GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null,
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId");
when(producerManager.getAvailableChannel(anyString()))
.thenReturn(grpcClientChannel);