You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/15 06:04:09 UTC

[rocketmq] branch develop updated: Remove filter server module (#6749)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f1b411cec Remove filter server module (#6749)
f1b411cec is described below

commit f1b411cecc3a9c441fdec2caf5867601419f3fc0
Author: rongtong <ji...@163.com>
AuthorDate: Mon May 15 14:04:01 2023 +0800

    Remove filter server module (#6749)
    
    * Remove filter server module
    
    * Pass the check style
    
    * Remove filterServerNum config
    
    * Remove more related code
---
 .../apache/rocketmq/broker/BrokerController.java   |  19 +--
 .../broker/client/ClientHousekeepingService.java   |   4 -
 .../broker/filtersrv/FilterServerManager.java      | 169 ---------------------
 .../broker/filtersrv/FilterServerUtil.java         |  42 -----
 .../broker/processor/AdminBrokerProcessor.java     |  21 ---
 .../broker/filtersrv/FilterServerManagerTest.java  |  88 -----------
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  29 ----
 .../client/impl/factory/MQClientInstance.java      |  69 ---------
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 --
 .../RegisterFilterServerRequestHeader.java         |  39 -----
 .../RegisterFilterServerResponseHeader.java        |  49 ------
 .../RegisterMessageFilterClassRequestHeader.java   |  69 ---------
 12 files changed, 2 insertions(+), 606 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 22c403eaf..329bd86c0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.AbstractMap;
@@ -55,7 +56,6 @@ import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
-import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
@@ -212,7 +212,6 @@ public class BrokerController {
     protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
     protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue;
     protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue;
-    protected final FilterServerManager filterServerManager;
     protected final BrokerStatsManager brokerStatsManager;
     protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>();
     protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
@@ -327,8 +326,6 @@ public class BrokerController {
             this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
         }
 
-        this.filterServerManager = new FilterServerManager(this);
-
         this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
         this.clientManageProcessor = new ClientManageProcessor(this);
         this.slaveSynchronize = new SlaveSynchronize(this);
@@ -1353,10 +1350,6 @@ public class BrokerController {
 
         this.consumerOffsetManager.persist();
 
-        if (this.filterServerManager != null) {
-            this.filterServerManager.shutdown();
-        }
-
         if (this.brokerFastFailure != null) {
             this.brokerFastFailure.shutdown();
         }
@@ -1530,10 +1523,6 @@ public class BrokerController {
             this.clientHousekeepingService.start();
         }
 
-        if (this.filterServerManager != null) {
-            this.filterServerManager.start();
-        }
-
         if (this.brokerStatsManager != null) {
             this.brokerStatsManager.start();
         }
@@ -1730,7 +1719,7 @@ public class BrokerController {
             this.brokerConfig.getBrokerId(),
             this.getHAServerAddr(),
             topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
+            Lists.newArrayList(),
             oneway,
             this.brokerConfig.getRegisterBrokerTimeoutMills(),
             this.brokerConfig.isEnableSlaveActingMaster(),
@@ -2087,10 +2076,6 @@ public class BrokerController {
         return ackThreadPoolQueue;
     }
 
-    public FilterServerManager getFilterServerManager() {
-        return filterServerManager;
-    }
-
     public BrokerStatsManager getBrokerStatsManager() {
         return brokerStatsManager;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index dafb50d36..98e5f450f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -56,7 +56,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     private void scanExceptionChannel() {
         this.brokerController.getProducerManager().scanNotActiveChannel();
         this.brokerController.getConsumerManager().scanNotActiveChannel();
-        this.brokerController.getFilterServerManager().scanNotActiveChannel();
     }
 
     public void shutdown() {
@@ -72,7 +71,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     public void onChannelClose(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getBrokerStatsManager().incChannelCloseNum();
     }
 
@@ -80,7 +78,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     public void onChannelException(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getBrokerStatsManager().incChannelExceptionNum();
     }
 
@@ -88,7 +85,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     public void onChannelIdle(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getBrokerStatsManager().incChannelIdleNum();
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index 57497f904..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.utils.NetworkUtil;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
-public class FilterServerManager {
-
-    public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
-        new ConcurrentHashMap<>(16);
-    private final BrokerController brokerController;
-
-    private ScheduledExecutorService scheduledExecutorService = Executors
-        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
-    public FilterServerManager(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void start() {
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
-            @Override
-            public void run0() {
-                try {
-                    FilterServerManager.this.createFilterServer();
-                } catch (Exception e) {
-                    log.error("", e);
-                }
-            }
-        }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
-    }
-
-    public void createFilterServer() {
-        int more =
-            this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
-        String cmd = this.buildStartCommand();
-        for (int i = 0; i < more; i++) {
-            FilterServerUtil.callShell(cmd, log);
-        }
-    }
-
-    private String buildStartCommand() {
-        String config = "";
-        if (BrokerStartup.CONFIG_FILE_HELPER.getFile() != null) {
-            config = String.format("-c %s", BrokerStartup.CONFIG_FILE_HELPER.getFile());
-        }
-
-        if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
-            config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
-        }
-
-        if (NetworkUtil.isWindowsPlatform()) {
-            return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
-                this.brokerController.getBrokerConfig().getRocketmqHome(),
-                config);
-        } else {
-            return String.format("sh %s/bin/startfsrv.sh %s",
-                this.brokerController.getBrokerConfig().getRocketmqHome(),
-                config);
-        }
-    }
-
-    public void shutdown() {
-        this.scheduledExecutorService.shutdown();
-    }
-
-    public void registerFilterServer(final Channel channel, final String filterServerAddr) {
-        FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
-        if (filterServerInfo != null) {
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-        } else {
-            filterServerInfo = new FilterServerInfo();
-            filterServerInfo.setFilterServerAddr(filterServerAddr);
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-            this.filterServerTable.put(channel, filterServerInfo);
-            log.info("Receive a New Filter Server<{}>", filterServerAddr);
-        }
-    }
-
-    public void scanNotActiveChannel() {
-
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            long timestamp = next.getValue().getLastUpdateTimestamp();
-            Channel channel = next.getKey();
-            if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
-                log.info("The Filter Server<{}> expired, remove it", next.getKey());
-                it.remove();
-                RemotingHelper.closeChannel(channel);
-            }
-        }
-    }
-
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
-        FilterServerInfo old = this.filterServerTable.remove(channel);
-        if (old != null) {
-            log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
-                remoteAddr);
-        }
-    }
-
-    public List<String> buildNewFilterServerList() {
-        List<String> addr = new ArrayList<>();
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            addr.add(next.getValue().getFilterServerAddr());
-        }
-        return addr;
-    }
-
-    static class FilterServerInfo {
-        private String filterServerAddr;
-        private long lastUpdateTimestamp;
-
-        public String getFilterServerAddr() {
-            return filterServerAddr;
-        }
-
-        public void setFilterServerAddr(String filterServerAddr) {
-            this.filterServerAddr = filterServerAddr;
-        }
-
-        public long getLastUpdateTimestamp() {
-            return lastUpdateTimestamp;
-        }
-
-        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
-            this.lastUpdateTimestamp = lastUpdateTimestamp;
-        }
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index dc1a5f850..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-
-public class FilterServerUtil {
-    public static void callShell(final String shellString, final Logger log) {
-        Process process = null;
-        try {
-            String[] cmdArray = splitShellString(shellString);
-            process = Runtime.getRuntime().exec(cmdArray);
-            process.waitFor();
-            log.info("CallShell: <{}> OK", shellString);
-        } catch (Throwable e) {
-            log.error("CallShell: readLine IOException, {}", shellString, e);
-        } finally {
-            if (null != process)
-                process.destroy();
-        }
-    }
-
-    private static String[] splitShellString(final String shellString) {
-        return shellString.split(" ");
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index be673b916..0a05239e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -161,8 +161,6 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
@@ -263,8 +261,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return this.queryTopicsByConsumer(ctx, request);
             case RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER:
                 return this.querySubscriptionByConsumer(ctx, request);
-            case RequestCode.REGISTER_FILTER_SERVER:
-                return this.registerFilterServer(ctx, request);
             case RequestCode.QUERY_CONSUME_TIME_SPAN:
                 return this.queryConsumeTimeSpan(ctx, request);
             case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
@@ -1862,23 +1858,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     }
 
-    private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
-        final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
-        final RegisterFilterServerRequestHeader requestHeader =
-            (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
-
-        this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
-
-        responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
-        responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-        return response;
-    }
-
     private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java
deleted file mode 100644
index 46cd460d3..000000000
--- a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.List;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class FilterServerManagerTest {
-
-    @Mock
-    private BrokerController brokerController;
-
-    private FilterServerManager filterServerManager;
-
-    private BrokerConfig brokerConfig = new BrokerConfig();
-
-    @Mock
-    private Channel channel;
-
-    private static final String FILTER_SERVER_ADDR = "192.168.1.1";
-
-    @Before
-    public void before() throws InterruptedException {
-        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-        filterServerManager = new FilterServerManager(brokerController);
-        filterServerManager.start();
-        filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR);
-    }
-
-    @After
-    public void after() {
-        filterServerManager.shutdown();
-        brokerController.shutdown();
-    }
-
-    @Test
-    public void createFilterServerTest() {
-        Assertions.assertThatCode(() ->  filterServerManager.createFilterServer()).doesNotThrowAnyException();
-    }
-
-    @Test
-    public void registerFilterServerTest() {
-        Assertions.assertThatCode(() ->  filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR)).doesNotThrowAnyException();
-    }
-
-    @Test
-    public void scanNotActiveChannelTest() {
-        Assertions.assertThatCode(() ->  filterServerManager.scanNotActiveChannel()).doesNotThrowAnyException();
-    }
-
-    @Test
-    public void doChannelCloseEventTest() {
-        Assertions.assertThatCode(() -> filterServerManager.doChannelCloseEvent(FILTER_SERVER_ADDR, channel)).doesNotThrowAnyException();
-    }
-
-    @Test
-    public void buildNewFilterServerListTest() {
-        final List<String> filterServerList = filterServerManager.buildNewFilterServerList();
-        assert !filterServerList.isEmpty();
-    }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 192111415..2c7a988ee 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -193,7 +193,6 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
 import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -2258,34 +2257,6 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public void registerMessageFilterClass(final String addr,
-        final String consumerGroup,
-        final String topic,
-        final String className,
-        final int classCRC,
-        final byte[] classBody,
-        final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-        InterruptedException, MQBrokerException {
-        RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        requestHeader.setClassName(className);
-        requestHeader.setTopic(topic);
-        requestHeader.setClassCRC(classCRC);
-
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);
-        request.setBody(classBody);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
-    }
-
     public TopicList getSystemTopicList(
         final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index dedfa09ce..703bec4a2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -57,7 +57,6 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -71,7 +70,6 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -462,7 +460,6 @@ public class MQClientInstance {
         if (this.lockHeartbeat.tryLock()) {
             try {
                 this.sendHeartbeatToAllBroker();
-                this.uploadFilterClassSource();
             } catch (final Exception e) {
                 log.error("sendHeartbeatToAllBroker exception", e);
             } finally {
@@ -566,29 +563,6 @@ public class MQClientInstance {
         }
     }
 
-    private void uploadFilterClassSource() {
-        for (Entry<String, MQConsumerInner> next : this.consumerTable.entrySet()) {
-            MQConsumerInner consumer = next.getValue();
-            if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) {
-                continue;
-            }
-            Set<SubscriptionData> subscriptions = consumer.subscriptions();
-            for (SubscriptionData sub : subscriptions) {
-                if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
-                    final String consumerGroup = consumer.groupName();
-                    final String className = sub.getSubString();
-                    final String topic = sub.getTopic();
-                    final String filterClassSource = sub.getFilterClassSource();
-                    try {
-                        this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
-                    } catch (Exception e) {
-                        log.error("uploadFilterClassToAllFilterServer Exception", e);
-                    }
-                }
-            }
-        }
-    }
-
     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
         DefaultMQProducer defaultMQProducer) {
         try {
@@ -729,49 +703,6 @@ public class MQClientInstance {
         return false;
     }
 
-    /**
-     * This method will be removed in the version 5.0.0,because filterServer was removed,and method
-     * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
-     */
-    @Deprecated
-    private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
-        final String topic,
-        final String filterClassSource) {
-        byte[] classBody = null;
-        int classCRC = 0;
-        try {
-            classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
-            classCRC = UtilAll.crc32(classBody);
-        } catch (Exception e1) {
-            log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
-                fullClassName,
-                UtilAll.exceptionSimpleDesc(e1));
-        }
-
-        TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
-        if (topicRouteData != null
-            && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
-            for (Entry<String, List<String>> next : topicRouteData.getFilterServerTable().entrySet()) {
-                List<String> value = next.getValue();
-                for (final String fsAddr : value) {
-                    try {
-                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
-                            5000);
-
-                        log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
-                            topic, fullClassName);
-
-                    } catch (Exception e) {
-                        log.error("uploadFilterClassToAllFilterServer Exception", e);
-                    }
-                }
-            }
-        } else {
-            log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
-                consumerGroup, topic, fullClassName);
-        }
-    }
-
     private boolean isNeedUpdateTopicRouteInfo(final String topic) {
         boolean result = false;
         Iterator<Entry<String, MQProducerInner>> producerIterator = this.producerTable.entrySet().iterator();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 08fbcb521..07640232f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -112,8 +112,6 @@ public class BrokerConfig extends BrokerIdentity {
     private int adminBrokerThreadPoolQueueCapacity = 10000;
     private int loadBalanceThreadPoolQueueCapacity = 100000;
 
-    private int filterServerNums = 0;
-
     private boolean longPollingEnable = true;
 
     private long shortPollingTimeMills = 1000;
@@ -925,14 +923,6 @@ public class BrokerConfig extends BrokerIdentity {
         this.brokerTopicEnable = brokerTopicEnable;
     }
 
-    public int getFilterServerNums() {
-        return filterServerNums;
-    }
-
-    public void setFilterServerNums(int filterServerNums) {
-        this.filterServerNums = filterServerNums;
-    }
-
     public boolean isLongPollingEnable() {
         return longPollingEnable;
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java
deleted file mode 100644
index 14dacf6ec..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterFilterServerRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String filterServerAddr;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
-
-    public String getFilterServerAddr() {
-        return filterServerAddr;
-    }
-
-    public void setFilterServerAddr(String filterServerAddr) {
-        this.filterServerAddr = filterServerAddr;
-    }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
deleted file mode 100644
index a618a4f30..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterFilterServerResponseHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String brokerName;
-    @CFNotNull
-    private long brokerId;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
-
-    public long getBrokerId() {
-        return brokerId;
-    }
-
-    public void setBrokerId(long brokerId) {
-        this.brokerId = brokerId;
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
-    }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
deleted file mode 100644
index b214ee5cd..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterMessageFilterClassRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String consumerGroup;
-    @CFNotNull
-    private String topic;
-    @CFNotNull
-    private String className;
-    @CFNotNull
-    private Integer classCRC;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public String getClassName() {
-        return className;
-    }
-
-    public void setClassName(String className) {
-        this.className = className;
-    }
-
-    public Integer getClassCRC() {
-        return classCRC;
-    }
-
-    public void setClassCRC(Integer classCRC) {
-        this.classCRC = classCRC;
-    }
-}