You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/15 06:04:09 UTC
[rocketmq] branch develop updated: Remove filter server module (#6749)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f1b411cec Remove filter server module (#6749)
f1b411cec is described below
commit f1b411cecc3a9c441fdec2caf5867601419f3fc0
Author: rongtong <ji...@163.com>
AuthorDate: Mon May 15 14:04:01 2023 +0800
Remove filter server module (#6749)
* Remove filter server module
* Pass the check style
* Remove filterServerNum config
* Remove more related code
---
.../apache/rocketmq/broker/BrokerController.java | 19 +--
.../broker/client/ClientHousekeepingService.java | 4 -
.../broker/filtersrv/FilterServerManager.java | 169 ---------------------
.../broker/filtersrv/FilterServerUtil.java | 42 -----
.../broker/processor/AdminBrokerProcessor.java | 21 ---
.../broker/filtersrv/FilterServerManagerTest.java | 88 -----------
.../rocketmq/client/impl/MQClientAPIImpl.java | 29 ----
.../client/impl/factory/MQClientInstance.java | 69 ---------
.../org/apache/rocketmq/common/BrokerConfig.java | 10 --
.../RegisterFilterServerRequestHeader.java | 39 -----
.../RegisterFilterServerResponseHeader.java | 49 ------
.../RegisterMessageFilterClassRequestHeader.java | 69 ---------
12 files changed, 2 insertions(+), 606 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 22c403eaf..329bd86c0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
@@ -55,7 +56,6 @@ import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
-import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
@@ -212,7 +212,6 @@ public class BrokerController {
protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue;
protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue;
- protected final FilterServerManager filterServerManager;
protected final BrokerStatsManager brokerStatsManager;
protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>();
protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
@@ -327,8 +326,6 @@ public class BrokerController {
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
}
- this.filterServerManager = new FilterServerManager(this);
-
this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
this.clientManageProcessor = new ClientManageProcessor(this);
this.slaveSynchronize = new SlaveSynchronize(this);
@@ -1353,10 +1350,6 @@ public class BrokerController {
this.consumerOffsetManager.persist();
- if (this.filterServerManager != null) {
- this.filterServerManager.shutdown();
- }
-
if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
@@ -1530,10 +1523,6 @@ public class BrokerController {
this.clientHousekeepingService.start();
}
- if (this.filterServerManager != null) {
- this.filterServerManager.start();
- }
-
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
@@ -1730,7 +1719,7 @@ public class BrokerController {
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
+ Lists.newArrayList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
@@ -2087,10 +2076,6 @@ public class BrokerController {
return ackThreadPoolQueue;
}
- public FilterServerManager getFilterServerManager() {
- return filterServerManager;
- }
-
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index dafb50d36..98e5f450f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -56,7 +56,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
- this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
public void shutdown() {
@@ -72,7 +71,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void onChannelClose(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getBrokerStatsManager().incChannelCloseNum();
}
@@ -80,7 +78,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void onChannelException(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getBrokerStatsManager().incChannelExceptionNum();
}
@@ -88,7 +85,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getBrokerStatsManager().incChannelIdleNum();
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index 57497f904..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.utils.NetworkUtil;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
-public class FilterServerManager {
-
- public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
- new ConcurrentHashMap<>(16);
- private final BrokerController brokerController;
-
- private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
- public FilterServerManager(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void start() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
- @Override
- public void run0() {
- try {
- FilterServerManager.this.createFilterServer();
- } catch (Exception e) {
- log.error("", e);
- }
- }
- }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
- }
-
- public void createFilterServer() {
- int more =
- this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
- String cmd = this.buildStartCommand();
- for (int i = 0; i < more; i++) {
- FilterServerUtil.callShell(cmd, log);
- }
- }
-
- private String buildStartCommand() {
- String config = "";
- if (BrokerStartup.CONFIG_FILE_HELPER.getFile() != null) {
- config = String.format("-c %s", BrokerStartup.CONFIG_FILE_HELPER.getFile());
- }
-
- if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
- config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
- }
-
- if (NetworkUtil.isWindowsPlatform()) {
- return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- } else {
- return String.format("sh %s/bin/startfsrv.sh %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- }
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
- public void registerFilterServer(final Channel channel, final String filterServerAddr) {
- FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
- if (filterServerInfo != null) {
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- } else {
- filterServerInfo = new FilterServerInfo();
- filterServerInfo.setFilterServerAddr(filterServerAddr);
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- this.filterServerTable.put(channel, filterServerInfo);
- log.info("Receive a New Filter Server<{}>", filterServerAddr);
- }
- }
-
- public void scanNotActiveChannel() {
-
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- long timestamp = next.getValue().getLastUpdateTimestamp();
- Channel channel = next.getKey();
- if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
- log.info("The Filter Server<{}> expired, remove it", next.getKey());
- it.remove();
- RemotingHelper.closeChannel(channel);
- }
- }
- }
-
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- FilterServerInfo old = this.filterServerTable.remove(channel);
- if (old != null) {
- log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
- remoteAddr);
- }
- }
-
- public List<String> buildNewFilterServerList() {
- List<String> addr = new ArrayList<>();
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- addr.add(next.getValue().getFilterServerAddr());
- }
- return addr;
- }
-
- static class FilterServerInfo {
- private String filterServerAddr;
- private long lastUpdateTimestamp;
-
- public String getFilterServerAddr() {
- return filterServerAddr;
- }
-
- public void setFilterServerAddr(String filterServerAddr) {
- this.filterServerAddr = filterServerAddr;
- }
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index dc1a5f850..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-
-public class FilterServerUtil {
- public static void callShell(final String shellString, final Logger log) {
- Process process = null;
- try {
- String[] cmdArray = splitShellString(shellString);
- process = Runtime.getRuntime().exec(cmdArray);
- process.waitFor();
- log.info("CallShell: <{}> OK", shellString);
- } catch (Throwable e) {
- log.error("CallShell: readLine IOException, {}", shellString, e);
- } finally {
- if (null != process)
- process.destroy();
- }
- }
-
- private static String[] splitShellString(final String shellString) {
- return shellString.split(" ");
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index be673b916..0a05239e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -161,8 +161,6 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
@@ -263,8 +261,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return this.queryTopicsByConsumer(ctx, request);
case RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER:
return this.querySubscriptionByConsumer(ctx, request);
- case RequestCode.REGISTER_FILTER_SERVER:
- return this.registerFilterServer(ctx, request);
case RequestCode.QUERY_CONSUME_TIME_SPAN:
return this.queryConsumeTimeSpan(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
@@ -1862,23 +1858,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
- private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
- final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
- final RegisterFilterServerRequestHeader requestHeader =
- (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
-
- this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
-
- responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
- responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
- }
-
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java
deleted file mode 100644
index 46cd460d3..000000000
--- a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.filtersrv;
-
-import io.netty.channel.Channel;
-import java.util.List;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class FilterServerManagerTest {
-
- @Mock
- private BrokerController brokerController;
-
- private FilterServerManager filterServerManager;
-
- private BrokerConfig brokerConfig = new BrokerConfig();
-
- @Mock
- private Channel channel;
-
- private static final String FILTER_SERVER_ADDR = "192.168.1.1";
-
- @Before
- public void before() throws InterruptedException {
- when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
- filterServerManager = new FilterServerManager(brokerController);
- filterServerManager.start();
- filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR);
- }
-
- @After
- public void after() {
- filterServerManager.shutdown();
- brokerController.shutdown();
- }
-
- @Test
- public void createFilterServerTest() {
- Assertions.assertThatCode(() -> filterServerManager.createFilterServer()).doesNotThrowAnyException();
- }
-
- @Test
- public void registerFilterServerTest() {
- Assertions.assertThatCode(() -> filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR)).doesNotThrowAnyException();
- }
-
- @Test
- public void scanNotActiveChannelTest() {
- Assertions.assertThatCode(() -> filterServerManager.scanNotActiveChannel()).doesNotThrowAnyException();
- }
-
- @Test
- public void doChannelCloseEventTest() {
- Assertions.assertThatCode(() -> filterServerManager.doChannelCloseEvent(FILTER_SERVER_ADDR, channel)).doesNotThrowAnyException();
- }
-
- @Test
- public void buildNewFilterServerListTest() {
- final List<String> filterServerList = filterServerManager.buildNewFilterServerList();
- assert !filterServerList.isEmpty();
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 192111415..2c7a988ee 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -193,7 +193,6 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -2258,34 +2257,6 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void registerMessageFilterClass(final String addr,
- final String consumerGroup,
- final String topic,
- final String className,
- final int classCRC,
- final byte[] classBody,
- final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException {
- RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- requestHeader.setClassName(className);
- requestHeader.setTopic(topic);
- requestHeader.setClassCRC(classCRC);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);
- request.setBody(classBody);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
- }
-
public TopicList getSystemTopicList(
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index dedfa09ce..703bec4a2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -57,7 +57,6 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
@@ -71,7 +70,6 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -462,7 +460,6 @@ public class MQClientInstance {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
- this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
@@ -566,29 +563,6 @@ public class MQClientInstance {
}
}
- private void uploadFilterClassSource() {
- for (Entry<String, MQConsumerInner> next : this.consumerTable.entrySet()) {
- MQConsumerInner consumer = next.getValue();
- if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) {
- continue;
- }
- Set<SubscriptionData> subscriptions = consumer.subscriptions();
- for (SubscriptionData sub : subscriptions) {
- if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
- final String consumerGroup = consumer.groupName();
- final String className = sub.getSubString();
- final String topic = sub.getTopic();
- final String filterClassSource = sub.getFilterClassSource();
- try {
- this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
- } catch (Exception e) {
- log.error("uploadFilterClassToAllFilterServer Exception", e);
- }
- }
- }
- }
- }
-
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
@@ -729,49 +703,6 @@ public class MQClientInstance {
return false;
}
- /**
- * This method will be removed in the version 5.0.0,because filterServer was removed,and method
- * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
- */
- @Deprecated
- private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
- final String topic,
- final String filterClassSource) {
- byte[] classBody = null;
- int classCRC = 0;
- try {
- classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
- classCRC = UtilAll.crc32(classBody);
- } catch (Exception e1) {
- log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
- fullClassName,
- UtilAll.exceptionSimpleDesc(e1));
- }
-
- TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
- if (topicRouteData != null
- && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
- for (Entry<String, List<String>> next : topicRouteData.getFilterServerTable().entrySet()) {
- List<String> value = next.getValue();
- for (final String fsAddr : value) {
- try {
- this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
- 5000);
-
- log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
- topic, fullClassName);
-
- } catch (Exception e) {
- log.error("uploadFilterClassToAllFilterServer Exception", e);
- }
- }
- }
- } else {
- log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
- consumerGroup, topic, fullClassName);
- }
- }
-
private boolean isNeedUpdateTopicRouteInfo(final String topic) {
boolean result = false;
Iterator<Entry<String, MQProducerInner>> producerIterator = this.producerTable.entrySet().iterator();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 08fbcb521..07640232f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -112,8 +112,6 @@ public class BrokerConfig extends BrokerIdentity {
private int adminBrokerThreadPoolQueueCapacity = 10000;
private int loadBalanceThreadPoolQueueCapacity = 100000;
- private int filterServerNums = 0;
-
private boolean longPollingEnable = true;
private long shortPollingTimeMills = 1000;
@@ -925,14 +923,6 @@ public class BrokerConfig extends BrokerIdentity {
this.brokerTopicEnable = brokerTopicEnable;
}
- public int getFilterServerNums() {
- return filterServerNums;
- }
-
- public void setFilterServerNums(int filterServerNums) {
- this.filterServerNums = filterServerNums;
- }
-
public boolean isLongPollingEnable() {
return longPollingEnable;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java
deleted file mode 100644
index 14dacf6ec..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterFilterServerRequestHeader implements CommandCustomHeader {
- @CFNotNull
- private String filterServerAddr;
-
- @Override
- public void checkFields() throws RemotingCommandException {
- }
-
- public String getFilterServerAddr() {
- return filterServerAddr;
- }
-
- public void setFilterServerAddr(String filterServerAddr) {
- this.filterServerAddr = filterServerAddr;
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
deleted file mode 100644
index a618a4f30..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterFilterServerResponseHeader implements CommandCustomHeader {
- @CFNotNull
- private String brokerName;
- @CFNotNull
- private long brokerId;
-
- @Override
- public void checkFields() throws RemotingCommandException {
- }
-
- public long getBrokerId() {
- return brokerId;
- }
-
- public void setBrokerId(long brokerId) {
- this.brokerId = brokerId;
- }
-
- public String getBrokerName() {
- return brokerName;
- }
-
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
deleted file mode 100644
index b214ee5cd..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.protocol.header.filtersrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RegisterMessageFilterClassRequestHeader implements CommandCustomHeader {
- @CFNotNull
- private String consumerGroup;
- @CFNotNull
- private String topic;
- @CFNotNull
- private String className;
- @CFNotNull
- private Integer classCRC;
-
- @Override
- public void checkFields() throws RemotingCommandException {
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
-
- public Integer getClassCRC() {
- return classCRC;
- }
-
- public void setClassCRC(Integer classCRC) {
- this.classCRC = classCRC;
- }
-}