You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:04 UTC

[rocketmq] 23/26: [ISSUE #5485] Remove ConsumerManagerInterface and ProducerManagerInterface

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 8c9a9d36b844390cccc2ab083ed63d5b222e256c
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Dec 6 16:50:59 2022 +0800

    [ISSUE #5485] Remove ConsumerManagerInterface and ProducerManagerInterface
---
 .../rocketmq/broker/client/ConsumerManager.java    | 13 +----
 .../broker/client/ConsumerManagerInterface.java    | 60 ----------------------
 .../rocketmq/broker/client/ProducerManager.java    | 12 +----
 .../broker/client/ProducerManagerInterface.java    | 44 ----------------
 .../rocketmq/proxy/service/ServiceManager.java     |  8 +--
 .../service/client/ClusterConsumerManager.java     |  9 ++--
 .../proxy/service/sysmessage/HeartbeatSyncer.java  | 12 ++---
 .../service/sysmessage/HeartbeatSyncerTest.java    |  4 +-
 8 files changed, 18 insertions(+), 144 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index a70e8579e..787dcdbd2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-public class ConsumerManager implements ConsumerManagerInterface {
+public class ConsumerManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
         new ConcurrentHashMap<>(1024);
@@ -64,7 +64,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
     }
 
-    @Override
     public ClientChannelInfo findChannel(final String group, final String clientId) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (consumerGroupInfo != null) {
@@ -73,7 +72,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return null;
     }
 
-    @Override
     public ClientChannelInfo findChannel(final String group, final Channel channel) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (consumerGroupInfo != null) {
@@ -82,7 +80,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return null;
     }
 
-    @Override
     public SubscriptionData findSubscriptionData(final String group, final String topic) {
         return findSubscriptionData(group, topic, true);
     }
@@ -110,7 +107,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return this.consumerTable;
     }
 
-    @Override
     public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
         return getConsumerGroupInfo(group, false);
     }
@@ -123,7 +119,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return consumerGroupInfo;
     }
 
-    @Override
     public int findSubscriptionDataCount(final String group) {
         ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
         if (consumerGroupInfo != null) {
@@ -133,7 +128,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return 0;
     }
 
-    @Override
     public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         boolean removed = false;
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -178,7 +172,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
             isNotifyConsumerIdsChangedEnable, true);
     }
 
-    @Override
     public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
         ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
         final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
@@ -214,7 +207,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return r1 || r2;
     }
 
-    @Override
     public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
         boolean isNotifyConsumerIdsChangedEnable) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
@@ -260,7 +252,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         }
     }
 
-    @Override
     public void scanNotActiveChannel() {
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -295,7 +286,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         removeExpireConsumerGroupInfo();
     }
 
-    @Override
     public HashSet<String> queryTopicConsumeByWho(final String topic) {
         HashSet<String> groups = new HashSet<>();
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -310,7 +300,6 @@ public class ConsumerManager implements ConsumerManagerInterface {
         return groups;
     }
 
-    @Override
     public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
         consumerIdsChangeListenerList.add(listener);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
deleted file mode 100644
index 6998f60e7..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.client;
-
-import io.netty.channel.Channel;
-import java.util.Set;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-
-public interface ConsumerManagerInterface {
-
-    ClientChannelInfo findChannel(String group, String clientId);
-
-    ClientChannelInfo findChannel(String group, Channel channel);
-
-    SubscriptionData findSubscriptionData(String group, String topic);
-
-    ConsumerGroupInfo getConsumerGroupInfo(String group);
-
-    int findSubscriptionDataCount(String group);
-
-    boolean doChannelCloseEvent(String remoteAddr, Channel channel);
-
-    default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
-        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
-        Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
-        return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
-            isNotifyConsumerIdsChangedEnable, true);
-    }
-
-    boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
-        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
-        Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription);
-
-    void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
-        boolean isNotifyConsumerIdsChangedEnable);
-
-    void scanNotActiveChannel();
-
-    Set<String> queryTopicConsumeByWho(String topic);
-
-    void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener);
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index a3ed9c590..52d67bf28 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.protocol.body.ProducerInfo;
 import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-public class ProducerManager implements ProducerManagerInterface {
+public class ProducerManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
@@ -54,12 +54,10 @@ public class ProducerManager implements ProducerManagerInterface {
         this.brokerStatsManager = brokerStatsManager;
     }
 
-    @Override
     public int groupSize() {
         return this.groupChannelTable.size();
     }
 
-    @Override
     public boolean groupOnline(String group) {
         Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group);
         return channels != null && !channels.isEmpty();
@@ -69,7 +67,6 @@ public class ProducerManager implements ProducerManagerInterface {
         return groupChannelTable;
     }
 
-    @Override
     public ProducerTableInfo getProducerTable() {
         Map<String, List<ProducerInfo>> map = new HashMap<>();
         for (String group : this.groupChannelTable.keySet()) {
@@ -97,7 +94,6 @@ public class ProducerManager implements ProducerManagerInterface {
         return new ProducerTableInfo(map);
     }
 
-    @Override
     public void scanNotActiveChannel() {
         Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
 
@@ -133,7 +129,6 @@ public class ProducerManager implements ProducerManagerInterface {
         }
     }
 
-    @Override
     public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         boolean removed = false;
         if (channel != null) {
@@ -165,7 +160,6 @@ public class ProducerManager implements ProducerManagerInterface {
         return removed;
     }
 
-    @Override
     public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         ClientChannelInfo clientChannelInfoFound = null;
 
@@ -189,7 +183,6 @@ public class ProducerManager implements ProducerManagerInterface {
         }
     }
 
-    @Override
     public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
         if (null != channelTable && !channelTable.isEmpty()) {
@@ -209,7 +202,6 @@ public class ProducerManager implements ProducerManagerInterface {
         }
     }
 
-    @Override
     public Channel getAvailableChannel(String groupId) {
         if (groupId == null) {
             return null;
@@ -250,7 +242,6 @@ public class ProducerManager implements ProducerManagerInterface {
         return lastActiveChannel;
     }
 
-    @Override
     public Channel findChannel(String clientId) {
         return clientChannelTable.get(clientId);
     }
@@ -266,7 +257,6 @@ public class ProducerManager implements ProducerManagerInterface {
         }
     }
 
-    @Override
     public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) {
         producerChangeListenerList.add(producerChangeListener);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
deleted file mode 100644
index 5e2e7e5b0..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.client;
-
-import io.netty.channel.Channel;
-import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
-
-public interface ProducerManagerInterface {
-
-    int groupSize();
-
-    boolean groupOnline(String group);
-
-    ProducerTableInfo getProducerTable();
-
-    void scanNotActiveChannel();
-
-    boolean doChannelCloseEvent(String remoteAddr, Channel channel);
-
-    void registerProducer(String group, ClientChannelInfo clientChannelInfo);
-
-    void unregisterProducer(String group, ClientChannelInfo clientChannelInfo);
-
-    Channel getAvailableChannel(String groupId);
-
-    Channel findChannel(String clientId);
-
-    void appendProducerChangeListener(ProducerChangeListener producerChangeListener);
-}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
index ce84832ca..bfa2ed963 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
@@ -16,8 +16,8 @@
  */
 package org.apache.rocketmq.proxy.service;
 
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
-import org.apache.rocketmq.broker.client.ProducerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.message.MessageService;
@@ -31,9 +31,9 @@ public interface ServiceManager extends StartAndShutdown {
 
     TopicRouteService getTopicRouteService();
 
-    ProducerManagerInterface getProducerManager();
+    ProducerManager getProducerManager();
 
-    ConsumerManagerInterface getConsumerManager();
+    ConsumerManager getConsumerManager();
 
     TransactionService getTransactionService();
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
index 3a98b5ee1..94f4c5232 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -21,18 +21,17 @@ import java.util.Set;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
 import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
-public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown {
+public class ClusterConsumerManager extends ConsumerManager implements StartAndShutdown {
 
     protected HeartbeatSyncer heartbeatSyncer;
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 041cbcee6..9e333902e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -28,14 +28,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -44,15 +41,18 @@ import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
 public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
 
     protected ThreadPoolExecutor threadPoolExecutor;
-    protected ConsumerManagerInterface consumerManager;
+    protected ConsumerManager consumerManager;
     protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
 
     public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
-        ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+        ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
         super(topicRouteService, adminService, mqClientAPIFactory);
         this.consumerManager = consumerManager;
         this.init();
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 45e3942d6..95152186d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -34,7 +34,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -94,7 +94,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
     @Mock
     private AdminService adminService;
     @Mock
-    private ConsumerManagerInterface consumerManager;
+    private ConsumerManager consumerManager;
     @Mock
     private MQClientAPIFactory mqClientAPIFactory;
     @Mock