You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/15 06:32:41 UTC
[rocketmq] branch 4.9.x updated: Remove the filter server module
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new c469a60dc Remove the filter server module
c469a60dc is described below
commit c469a60dcca616b077caf2867b64582795ff8bfc
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Sat May 13 11:42:15 2023 +0800
Remove the filter server module
---
.../apache/rocketmq/broker/BrokerController.java | 18 +--
.../broker/client/ClientHousekeepingService.java | 4 -
.../broker/filtersrv/FilterServerManager.java | 167 ---------------------
.../broker/filtersrv/FilterServerUtil.java | 42 ------
.../broker/processor/AdminBrokerProcessor.java | 21 ---
5 files changed, 2 insertions(+), 250 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 58616bbe3..f2e20f976 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -44,7 +45,6 @@ import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
-import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
@@ -145,7 +145,6 @@ public class BrokerController {
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
- private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
@@ -198,7 +197,6 @@ public class BrokerController {
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
- this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
@@ -802,10 +800,6 @@ public class BrokerController {
this.consumerOffsetManager.persist();
- if (this.filterServerManager != null) {
- this.filterServerManager.shutdown();
- }
-
if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
@@ -879,10 +873,6 @@ public class BrokerController {
this.clientHousekeepingService.start();
}
- if (this.filterServerManager != null) {
- this.filterServerManager.start();
- }
-
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
@@ -963,7 +953,7 @@ public class BrokerController {
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
+ Lists.newArrayList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
@@ -1034,10 +1024,6 @@ public class BrokerController {
return sendThreadPoolQueue;
}
- public FilterServerManager getFilterServerManager() {
- return filterServerManager;
- }
-
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index d536db505..5eb5d62dd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -55,7 +55,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
- this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
public void shutdown() {
@@ -71,20 +70,17 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void onChannelClose(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index c1a860a91..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-
-public class FilterServerManager {
-
- public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
- new ConcurrentHashMap<Channel, FilterServerInfo>(16);
- private final BrokerController brokerController;
-
- private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
- public FilterServerManager(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void start() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- FilterServerManager.this.createFilterServer();
- } catch (Exception e) {
- log.error("", e);
- }
- }
- }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
- }
-
- public void createFilterServer() {
- int more =
- this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
- String cmd = this.buildStartCommand();
- for (int i = 0; i < more; i++) {
- FilterServerUtil.callShell(cmd, log);
- }
- }
-
- private String buildStartCommand() {
- String config = "";
- if (BrokerStartup.configFile != null) {
- config = String.format("-c %s", BrokerStartup.configFile);
- }
-
- if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
- config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
- }
-
- if (RemotingUtil.isWindowsPlatform()) {
- return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- } else {
- return String.format("sh %s/bin/startfsrv.sh %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- }
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
- public void registerFilterServer(final Channel channel, final String filterServerAddr) {
- FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
- if (filterServerInfo != null) {
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- } else {
- filterServerInfo = new FilterServerInfo();
- filterServerInfo.setFilterServerAddr(filterServerAddr);
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- this.filterServerTable.put(channel, filterServerInfo);
- log.info("Receive a New Filter Server<{}>", filterServerAddr);
- }
- }
-
- public void scanNotActiveChannel() {
-
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- long timestamp = next.getValue().getLastUpdateTimestamp();
- Channel channel = next.getKey();
- if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
- log.info("The Filter Server<{}> expired, remove it", next.getKey());
- it.remove();
- RemotingUtil.closeChannel(channel);
- }
- }
- }
-
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- FilterServerInfo old = this.filterServerTable.remove(channel);
- if (old != null) {
- log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
- remoteAddr);
- }
- }
-
- public List<String> buildNewFilterServerList() {
- List<String> addr = new ArrayList<>();
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- addr.add(next.getValue().getFilterServerAddr());
- }
- return addr;
- }
-
- static class FilterServerInfo {
- private String filterServerAddr;
- private long lastUpdateTimestamp;
-
- public String getFilterServerAddr() {
- return filterServerAddr;
- }
-
- public void setFilterServerAddr(String filterServerAddr) {
- this.filterServerAddr = filterServerAddr;
- }
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index 3f4d24d06..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-
-import org.apache.rocketmq.logging.InternalLogger;
-
-public class FilterServerUtil {
- public static void callShell(final String shellString, final InternalLogger log) {
- Process process = null;
- try {
- String[] cmdArray = splitShellString(shellString);
- process = Runtime.getRuntime().exec(cmdArray);
- process.waitFor();
- log.info("CallShell: <{}> OK", shellString);
- } catch (Throwable e) {
- log.error("CallShell: readLine IOException, {}", shellString, e);
- } finally {
- if (null != process)
- process.destroy();
- }
- }
-
- private static String[] splitShellString(final String shellString) {
- return shellString.split(" ");
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ca99605f7..95594ac4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -101,8 +101,6 @@ import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
-import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
-import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
@@ -203,8 +201,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor {
return this.getConsumerStatus(ctx, request);
case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
return this.queryTopicConsumeByWho(ctx, request);
- case RequestCode.REGISTER_FILTER_SERVER:
- return this.registerFilterServer(ctx, request);
case RequestCode.QUERY_CONSUME_TIME_SPAN:
return this.queryConsumeTimeSpan(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
@@ -1092,23 +1088,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor {
return response;
}
- private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
- final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
- final RegisterFilterServerRequestHeader requestHeader =
- (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
-
- this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
-
- responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
- responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
- }
-
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);