You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:04 UTC
[rocketmq] 23/26: [ISSUE #5485] Remove ConsumerManagerInterface and ProducerManagerInterface
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8c9a9d36b844390cccc2ab083ed63d5b222e256c
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Dec 6 16:50:59 2022 +0800
[ISSUE #5485] Remove ConsumerManagerInterface and ProducerManagerInterface
---
.../rocketmq/broker/client/ConsumerManager.java | 13 +----
.../broker/client/ConsumerManagerInterface.java | 60 ----------------------
.../rocketmq/broker/client/ProducerManager.java | 12 +----
.../broker/client/ProducerManagerInterface.java | 44 ----------------
.../rocketmq/proxy/service/ServiceManager.java | 8 +--
.../service/client/ClusterConsumerManager.java | 9 ++--
.../proxy/service/sysmessage/HeartbeatSyncer.java | 12 ++---
.../service/sysmessage/HeartbeatSyncerTest.java | 4 +-
8 files changed, 18 insertions(+), 144 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index a70e8579e..787dcdbd2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class ConsumerManager implements ConsumerManagerInterface {
+public class ConsumerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
@@ -64,7 +64,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
}
- @Override
public ClientChannelInfo findChannel(final String group, final String clientId) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
@@ -73,7 +72,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return null;
}
- @Override
public ClientChannelInfo findChannel(final String group, final Channel channel) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
@@ -82,7 +80,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return null;
}
- @Override
public SubscriptionData findSubscriptionData(final String group, final String topic) {
return findSubscriptionData(group, topic, true);
}
@@ -110,7 +107,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return this.consumerTable;
}
- @Override
public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
return getConsumerGroupInfo(group, false);
}
@@ -123,7 +119,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return consumerGroupInfo;
}
- @Override
public int findSubscriptionDataCount(final String group) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
@@ -133,7 +128,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return 0;
}
- @Override
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -178,7 +172,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
isNotifyConsumerIdsChangedEnable, true);
}
- @Override
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
@@ -214,7 +207,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return r1 || r2;
}
- @Override
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
@@ -260,7 +252,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
}
}
- @Override
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
@@ -295,7 +286,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
removeExpireConsumerGroupInfo();
}
- @Override
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -310,7 +300,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
return groups;
}
- @Override
public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
consumerIdsChangeListenerList.add(listener);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
deleted file mode 100644
index 6998f60e7..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.client;
-
-import io.netty.channel.Channel;
-import java.util.Set;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-
-public interface ConsumerManagerInterface {
-
- ClientChannelInfo findChannel(String group, String clientId);
-
- ClientChannelInfo findChannel(String group, Channel channel);
-
- SubscriptionData findSubscriptionData(String group, String topic);
-
- ConsumerGroupInfo getConsumerGroupInfo(String group);
-
- int findSubscriptionDataCount(String group);
-
- boolean doChannelCloseEvent(String remoteAddr, Channel channel);
-
- default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
- ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
- Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
- return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
- isNotifyConsumerIdsChangedEnable, true);
- }
-
- boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
- ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
- Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription);
-
- void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
- boolean isNotifyConsumerIdsChangedEnable);
-
- void scanNotActiveChannel();
-
- Set<String> queryTopicConsumeByWho(String topic);
-
- void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener);
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index a3ed9c590..52d67bf28 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.protocol.body.ProducerInfo;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class ProducerManager implements ProducerManagerInterface {
+public class ProducerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
@@ -54,12 +54,10 @@ public class ProducerManager implements ProducerManagerInterface {
this.brokerStatsManager = brokerStatsManager;
}
- @Override
public int groupSize() {
return this.groupChannelTable.size();
}
- @Override
public boolean groupOnline(String group) {
Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group);
return channels != null && !channels.isEmpty();
@@ -69,7 +67,6 @@ public class ProducerManager implements ProducerManagerInterface {
return groupChannelTable;
}
- @Override
public ProducerTableInfo getProducerTable() {
Map<String, List<ProducerInfo>> map = new HashMap<>();
for (String group : this.groupChannelTable.keySet()) {
@@ -97,7 +94,6 @@ public class ProducerManager implements ProducerManagerInterface {
return new ProducerTableInfo(map);
}
- @Override
public void scanNotActiveChannel() {
Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
@@ -133,7 +129,6 @@ public class ProducerManager implements ProducerManagerInterface {
}
}
- @Override
public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
if (channel != null) {
@@ -165,7 +160,6 @@ public class ProducerManager implements ProducerManagerInterface {
return removed;
}
- @Override
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
@@ -189,7 +183,6 @@ public class ProducerManager implements ProducerManagerInterface {
}
}
- @Override
public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
@@ -209,7 +202,6 @@ public class ProducerManager implements ProducerManagerInterface {
}
}
- @Override
public Channel getAvailableChannel(String groupId) {
if (groupId == null) {
return null;
@@ -250,7 +242,6 @@ public class ProducerManager implements ProducerManagerInterface {
return lastActiveChannel;
}
- @Override
public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
@@ -266,7 +257,6 @@ public class ProducerManager implements ProducerManagerInterface {
}
}
- @Override
public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) {
producerChangeListenerList.add(producerChangeListener);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
deleted file mode 100644
index 5e2e7e5b0..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.client;
-
-import io.netty.channel.Channel;
-import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
-
-public interface ProducerManagerInterface {
-
- int groupSize();
-
- boolean groupOnline(String group);
-
- ProducerTableInfo getProducerTable();
-
- void scanNotActiveChannel();
-
- boolean doChannelCloseEvent(String remoteAddr, Channel channel);
-
- void registerProducer(String group, ClientChannelInfo clientChannelInfo);
-
- void unregisterProducer(String group, ClientChannelInfo clientChannelInfo);
-
- Channel getAvailableChannel(String groupId);
-
- Channel findChannel(String clientId);
-
- void appendProducerChangeListener(ProducerChangeListener producerChangeListener);
-}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
index ce84832ca..bfa2ed963 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.proxy.service;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
-import org.apache.rocketmq.broker.client.ProducerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.message.MessageService;
@@ -31,9 +31,9 @@ public interface ServiceManager extends StartAndShutdown {
TopicRouteService getTopicRouteService();
- ProducerManagerInterface getProducerManager();
+ ProducerManager getProducerManager();
- ConsumerManagerInterface getConsumerManager();
+ ConsumerManager getConsumerManager();
TransactionService getTransactionService();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
index 3a98b5ee1..94f4c5232 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -21,18 +21,17 @@ import java.util.Set;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown {
+public class ClusterConsumerManager extends ConsumerManager implements StartAndShutdown {
protected HeartbeatSyncer heartbeatSyncer;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 041cbcee6..9e333902e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -28,14 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -44,15 +41,18 @@ import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
protected ThreadPoolExecutor threadPoolExecutor;
- protected ConsumerManagerInterface consumerManager;
+ protected ConsumerManager consumerManager;
protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
- ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+ ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
super(topicRouteService, adminService, mqClientAPIFactory);
this.consumerManager = consumerManager;
this.init();
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 45e3942d6..95152186d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -34,7 +34,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -94,7 +94,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
@Mock
private AdminService adminService;
@Mock
- private ConsumerManagerInterface consumerManager;
+ private ConsumerManager consumerManager;
@Mock
private MQClientAPIFactory mqClientAPIFactory;
@Mock