You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/15 06:32:41 UTC

[rocketmq] branch 4.9.x updated: Remove the filter server module

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new c469a60dc Remove the filter server module
c469a60dc is described below

commit c469a60dcca616b077caf2867b64582795ff8bfc
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Sat May 13 11:42:15 2023 +0800

    Remove the filter server module
---
 .../apache/rocketmq/broker/BrokerController.java   |  18 +--
 .../broker/client/ClientHousekeepingService.java   |   4 -
 .../broker/filtersrv/FilterServerManager.java      | 167 ---------------------
 .../broker/filtersrv/FilterServerUtil.java         |  42 ------
 .../broker/processor/AdminBrokerProcessor.java     |  21 ---
 5 files changed, 2 insertions(+), 250 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 58616bbe3..f2e20f976 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -44,7 +45,6 @@ import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
 import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
 import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
-import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
@@ -145,7 +145,6 @@ public class BrokerController {
     private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
-    private final FilterServerManager filterServerManager;
     private final BrokerStatsManager brokerStatsManager;
     private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
@@ -198,7 +197,6 @@ public class BrokerController {
         this.broker2Client = new Broker2Client(this);
         this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
         this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
-        this.filterServerManager = new FilterServerManager(this);
 
         this.slaveSynchronize = new SlaveSynchronize(this);
 
@@ -802,10 +800,6 @@ public class BrokerController {
 
         this.consumerOffsetManager.persist();
 
-        if (this.filterServerManager != null) {
-            this.filterServerManager.shutdown();
-        }
-
         if (this.brokerFastFailure != null) {
             this.brokerFastFailure.shutdown();
         }
@@ -879,10 +873,6 @@ public class BrokerController {
             this.clientHousekeepingService.start();
         }
 
-        if (this.filterServerManager != null) {
-            this.filterServerManager.start();
-        }
-
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
             startProcessorByHa(messageStoreConfig.getBrokerRole());
             handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
@@ -963,7 +953,7 @@ public class BrokerController {
             this.brokerConfig.getBrokerId(),
             this.getHAServerAddr(),
             topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
+            Lists.newArrayList(),
             oneway,
             this.brokerConfig.getRegisterBrokerTimeoutMills(),
             this.brokerConfig.isCompressedRegister());
@@ -1034,10 +1024,6 @@ public class BrokerController {
         return sendThreadPoolQueue;
     }
 
-    public FilterServerManager getFilterServerManager() {
-        return filterServerManager;
-    }
-
     public BrokerStatsManager getBrokerStatsManager() {
         return brokerStatsManager;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index d536db505..5eb5d62dd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -55,7 +55,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     private void scanExceptionChannel() {
         this.brokerController.getProducerManager().scanNotActiveChannel();
         this.brokerController.getConsumerManager().scanNotActiveChannel();
-        this.brokerController.getFilterServerManager().scanNotActiveChannel();
     }
 
     public void shutdown() {
@@ -71,20 +70,17 @@ public class ClientHousekeepingService implements ChannelEventListener {
     public void onChannelClose(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
     }
 
     @Override
     public void onChannelException(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
     }
 
     @Override
     public void onChannelIdle(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index c1a860a91..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-
-public class FilterServerManager {
-
-    public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
-        new ConcurrentHashMap<Channel, FilterServerInfo>(16);
-    private final BrokerController brokerController;
-
-    private ScheduledExecutorService scheduledExecutorService = Executors
-        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
-    public FilterServerManager(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void start() {
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    FilterServerManager.this.createFilterServer();
-                } catch (Exception e) {
-                    log.error("", e);
-                }
-            }
-        }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
-    }
-
-    public void createFilterServer() {
-        int more =
-            this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
-        String cmd = this.buildStartCommand();
-        for (int i = 0; i < more; i++) {
-            FilterServerUtil.callShell(cmd, log);
-        }
-    }
-
-    private String buildStartCommand() {
-        String config = "";
-        if (BrokerStartup.configFile != null) {
-            config = String.format("-c %s", BrokerStartup.configFile);
-        }
-
-        if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
-            config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
-        }
-
-        if (RemotingUtil.isWindowsPlatform()) {
-            return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
-                this.brokerController.getBrokerConfig().getRocketmqHome(),
-                config);
-        } else {
-            return String.format("sh %s/bin/startfsrv.sh %s",
-                this.brokerController.getBrokerConfig().getRocketmqHome(),
-                config);
-        }
-    }
-
-    public void shutdown() {
-        this.scheduledExecutorService.shutdown();
-    }
-
-    public void registerFilterServer(final Channel channel, final String filterServerAddr) {
-        FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
-        if (filterServerInfo != null) {
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-        } else {
-            filterServerInfo = new FilterServerInfo();
-            filterServerInfo.setFilterServerAddr(filterServerAddr);
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-            this.filterServerTable.put(channel, filterServerInfo);
-            log.info("Receive a New Filter Server<{}>", filterServerAddr);
-        }
-    }
-
-    public void scanNotActiveChannel() {
-
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            long timestamp = next.getValue().getLastUpdateTimestamp();
-            Channel channel = next.getKey();
-            if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
-                log.info("The Filter Server<{}> expired, remove it", next.getKey());
-                it.remove();
-                RemotingUtil.closeChannel(channel);
-            }
-        }
-    }
-
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
-        FilterServerInfo old = this.filterServerTable.remove(channel);
-        if (old != null) {
-            log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
-                remoteAddr);
-        }
-    }
-
-    public List<String> buildNewFilterServerList() {
-        List<String> addr = new ArrayList<>();
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            addr.add(next.getValue().getFilterServerAddr());
-        }
-        return addr;
-    }
-
-    static class FilterServerInfo {
-        private String filterServerAddr;
-        private long lastUpdateTimestamp;
-
-        public String getFilterServerAddr() {
-            return filterServerAddr;
-        }
-
-        public void setFilterServerAddr(String filterServerAddr) {
-            this.filterServerAddr = filterServerAddr;
-        }
-
-        public long getLastUpdateTimestamp() {
-            return lastUpdateTimestamp;
-        }
-
-        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
-            this.lastUpdateTimestamp = lastUpdateTimestamp;
-        }
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index 3f4d24d06..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-
-import org.apache.rocketmq.logging.InternalLogger;
-
-public class FilterServerUtil {
-    public static void callShell(final String shellString, final InternalLogger log) {
-        Process process = null;
-        try {
-            String[] cmdArray = splitShellString(shellString);
-            process = Runtime.getRuntime().exec(cmdArray);
-            process.waitFor();
-            log.info("CallShell: <{}> OK", shellString);
-        } catch (Throwable e) {
-            log.error("CallShell: readLine IOException, {}", shellString, e);
-        } finally {
-            if (null != process)
-                process.destroy();
-        }
-    }
-
-    private static String[] splitShellString(final String shellString) {
-        return shellString.split(" ");
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ca99605f7..95594ac4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -101,8 +101,6 @@ import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
-import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
-import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsSnapshot;
@@ -203,8 +201,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor {
                 return this.getConsumerStatus(ctx, request);
             case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
                 return this.queryTopicConsumeByWho(ctx, request);
-            case RequestCode.REGISTER_FILTER_SERVER:
-                return this.registerFilterServer(ctx, request);
             case RequestCode.QUERY_CONSUME_TIME_SPAN:
                 return this.queryConsumeTimeSpan(ctx, request);
             case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
@@ -1092,23 +1088,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
-        final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
-        final RegisterFilterServerRequestHeader requestHeader =
-            (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
-
-        this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
-
-        responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
-        responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-        return response;
-    }
-
     private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);