You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:43:58 UTC
[02/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
deleted file mode 100644
index 3921c92..0000000
--- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.filtersrv.stats;
-
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.stats.StatsItemSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-
-public class FilterServerStatsManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
- private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
-
- // ConsumerGroup Get Nums
- private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
- this.scheduledExecutorService, log);
-
- // ConsumerGroup Get Size
- private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
- this.scheduledExecutorService, log);
-
-
- public FilterServerStatsManager() {
- }
-
-
- public void start() {
- }
-
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
-
- public void incGroupGetNums(final String group, final String topic, final int incValue) {
- this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
- }
-
-
- public void incGroupGetSize(final String group, final String topic, final int incValue) {
- this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
new file mode 100644
index 0000000..1663dfc
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.filtersrv;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FilterServerOuterAPI {
+ private final RemotingClient remotingClient;
+
+
+ public FilterServerOuterAPI() {
+ this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
+ }
+
+
+ public void start() {
+ this.remotingClient.start();
+ }
+
+
+ public void shutdown() {
+ this.remotingClient.shutdown();
+ }
+
+
+ public RegisterFilterServerResponseHeader registerFilterServerToBroker(
+ final String brokerAddr,
+ final String filterServerAddr
+ ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
+ RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader();
+ requestHeader.setFilterServerAddr(filterServerAddr);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ RegisterFilterServerResponseHeader responseHeader =
+ (RegisterFilterServerResponseHeader) response
+ .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
+
+ return responseHeader;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
new file mode 100644
index 0000000..ec0381d
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+
+public class FiltersrvConfig {
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
+ System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ @ImportantField
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
+ System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+ private String connectWhichBroker = "127.0.0.1:10911";
+ private String filterServerIP = RemotingUtil.getLocalAddress();
+
+ private int compressMsgBodyOverHowmuch = 1024 * 8;
+ private int zipCompressLevel = 5;
+
+
+ private boolean clientUploadFilterClassEnable = true;
+
+
+ private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass";
+
+ private int fsServerAsyncSemaphoreValue = 2048;
+ private int fsServerCallbackExecutorThreads = 64;
+ private int fsServerWorkerThreads = 64;
+
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+
+ public String getConnectWhichBroker() {
+ return connectWhichBroker;
+ }
+
+
+ public void setConnectWhichBroker(String connectWhichBroker) {
+ this.connectWhichBroker = connectWhichBroker;
+ }
+
+
+ public String getFilterServerIP() {
+ return filterServerIP;
+ }
+
+
+ public void setFilterServerIP(String filterServerIP) {
+ this.filterServerIP = filterServerIP;
+ }
+
+
+ public int getCompressMsgBodyOverHowmuch() {
+ return compressMsgBodyOverHowmuch;
+ }
+
+
+ public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+ this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+ }
+
+
+ public int getZipCompressLevel() {
+ return zipCompressLevel;
+ }
+
+
+ public void setZipCompressLevel(int zipCompressLevel) {
+ this.zipCompressLevel = zipCompressLevel;
+ }
+
+
+ public boolean isClientUploadFilterClassEnable() {
+ return clientUploadFilterClassEnable;
+ }
+
+
+ public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) {
+ this.clientUploadFilterClassEnable = clientUploadFilterClassEnable;
+ }
+
+
+ public String getFilterClassRepertoryUrl() {
+ return filterClassRepertoryUrl;
+ }
+
+
+ public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) {
+ this.filterClassRepertoryUrl = filterClassRepertoryUrl;
+ }
+
+
+ public int getFsServerAsyncSemaphoreValue() {
+ return fsServerAsyncSemaphoreValue;
+ }
+
+
+ public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) {
+ this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue;
+ }
+
+
+ public int getFsServerCallbackExecutorThreads() {
+ return fsServerCallbackExecutorThreads;
+ }
+
+
+ public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) {
+ this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads;
+ }
+
+
+ public int getFsServerWorkerThreads() {
+ return fsServerWorkerThreads;
+ }
+
+
+ public void setFsServerWorkerThreads(int fsServerWorkerThreads) {
+ this.fsServerWorkerThreads = fsServerWorkerThreads;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
new file mode 100644
index 0000000..cb862a6
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.filtersrv;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
+import org.apache.rocketmq.filtersrv.filter.FilterClassManager;
+import org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor;
+import org.apache.rocketmq.filtersrv.stats.FilterServerStatsManager;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FiltersrvController {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ private final FiltersrvConfig filtersrvConfig;
+
+ private final NettyServerConfig nettyServerConfig;
+ private final FilterClassManager filterClassManager;
+
+ private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();
+ private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
+ MixAll.FILTERSRV_CONSUMER_GROUP);
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
+ private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();
+
+ private RemotingServer remotingServer;
+
+ private ExecutorService remotingExecutor;
+ private volatile String brokerName = null;
+
+
+ public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
+ this.filtersrvConfig = filtersrvConfig;
+ this.nettyServerConfig = nettyServerConfig;
+ this.filterClassManager = new FilterClassManager(this);
+ }
+
+
+ public boolean initialize() {
+
+ MixAll.printObjectProperties(log, this.filtersrvConfig);
+
+
+ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
+
+
+ this.remotingExecutor =
+ Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
+ new ThreadFactoryImpl("RemotingExecutorThread_"));
+
+ this.registerProcessor();
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ FiltersrvController.this.registerFilterServerToBroker();
+ }
+ }, 3, 10, TimeUnit.SECONDS);
+
+ this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
+ .getBrokerSuspendMaxTimeMillis() - 1000);
+ this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
+ .getConsumerTimeoutMillisWhenSuspend() - 1000);
+
+ this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
+ this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
+
+ return true;
+ }
+
+ private void registerProcessor() {
+ this.remotingServer
+ .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+ }
+
+ public void registerFilterServerToBroker() {
+ try {
+ RegisterFilterServerResponseHeader responseHeader =
+ this.filterServerOuterAPI.registerFilterServerToBroker(
+ this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
+ this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
+ .setDefaultBrokerId(responseHeader.getBrokerId());
+
+ if (null == this.brokerName) {
+ this.brokerName = responseHeader.getBrokerName();
+ }
+
+ log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
+ this.localAddr(),
+ this.filtersrvConfig.getConnectWhichBroker(),
+ responseHeader.getBrokerName(),
+ responseHeader.getBrokerId());
+ } catch (Exception e) {
+ log.warn("register filter server Exception", e);
+
+ log.warn("access broker failed, kill oneself");
+ System.exit(-1);
+ }
+ }
+
+ public String localAddr() {
+ return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),
+ this.remotingServer.localListenPort());
+ }
+
+ public void start() throws Exception {
+ this.defaultMQPullConsumer.start();
+ this.remotingServer.start();
+ this.filterServerOuterAPI.start();
+ this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
+ .setConnectBrokerByUser(true);
+ this.filterClassManager.start();
+ this.filterServerStatsManager.start();
+ }
+
+
+ public void shutdown() {
+ this.remotingServer.shutdown();
+ this.remotingExecutor.shutdown();
+ this.scheduledExecutorService.shutdown();
+ this.defaultMQPullConsumer.shutdown();
+ this.filterServerOuterAPI.shutdown();
+ this.filterClassManager.shutdown();
+ this.filterServerStatsManager.shutdown();
+ }
+
+
+ public RemotingServer getRemotingServer() {
+ return remotingServer;
+ }
+
+
+ public void setRemotingServer(RemotingServer remotingServer) {
+ this.remotingServer = remotingServer;
+ }
+
+
+ public ExecutorService getRemotingExecutor() {
+ return remotingExecutor;
+ }
+
+
+ public void setRemotingExecutor(ExecutorService remotingExecutor) {
+ this.remotingExecutor = remotingExecutor;
+ }
+
+
+ public FiltersrvConfig getFiltersrvConfig() {
+ return filtersrvConfig;
+ }
+
+
+ public NettyServerConfig getNettyServerConfig() {
+ return nettyServerConfig;
+ }
+
+
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduledExecutorService;
+ }
+
+
+ public FilterServerOuterAPI getFilterServerOuterAPI() {
+ return filterServerOuterAPI;
+ }
+
+
+ public FilterClassManager getFilterClassManager() {
+ return filterClassManager;
+ }
+
+
+ public DefaultMQPullConsumer getDefaultMQPullConsumer() {
+ return defaultMQPullConsumer;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public FilterServerStatsManager getFilterServerStatsManager() {
+ return filterServerStatsManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
new file mode 100644
index 0000000..4e1fbc4
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.filtersrv;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FiltersrvStartup {
+ public static Logger log;
+
+ public static void main(String[] args) {
+ start(createController(args));
+ }
+
+ public static FiltersrvController start(FiltersrvController controller) {
+
+ try {
+ controller.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ String tip = "The Filter Server boot success, " + controller.localAddr();
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+
+ return controller;
+ }
+
+ public static FiltersrvController createController(String[] args) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+ NettySystemConfig.socketSndbufSize = 65535;
+ }
+
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+ NettySystemConfig.socketRcvbufSize = 1024;
+ }
+
+ try {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ return null;
+ }
+
+ final FiltersrvConfig filtersrvConfig = new FiltersrvConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ Properties properties = new Properties();
+ properties.load(in);
+ MixAll.properties2Object(properties, filtersrvConfig);
+ System.out.printf("load config properties file OK, " + file + "%n");
+ in.close();
+
+ String port = properties.getProperty("listenPort");
+ if (port != null) {
+ filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port));
+ }
+ }
+ }
+
+ nettyServerConfig.setListenPort(0);
+ nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());
+ nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig
+ .getFsServerCallbackExecutorThreads());
+ nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());
+
+ if (commandLine.hasOption('p')) {
+ MixAll.printObjectProperties(null, filtersrvConfig);
+ MixAll.printObjectProperties(null, nettyServerConfig);
+ System.exit(0);
+ }
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);
+ if (null == filtersrvConfig.getRocketmqHome()) {
+ System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ + " variable in your environment to match the location of the RocketMQ installation%n");
+ System.exit(-2);
+ }
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");
+ log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ final FiltersrvController controller =
+ new FiltersrvController(filtersrvConfig, nettyServerConfig);
+ boolean initResult = controller.initialize();
+ if (!initResult) {
+ controller.shutdown();
+ System.exit(-3);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+ private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ long begineTime = System.currentTimeMillis();
+ controller.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+ log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+ }
+ }
+ }
+ }, "ShutdownHook"));
+
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ return null;
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Filter server config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
new file mode 100644
index 0000000..fd95685
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.util.*;
+
+
+public class DynaCode {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ private static final String FILE_SP = System.getProperty("file.separator");
+
+ private static final String LINE_SP = System.getProperty("line.separator");
+
+ private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP
+ + UtilAll.getPid();
+
+ private String outPutClassPath = sourcePath;
+
+
+ private ClassLoader parentClassLoader;
+
+
+ private List<String> codeStrs;
+
+
+ private Map<String/* fullClassName */, Class<?>/* class */> loadClass;
+
+
+ private String classpath;
+
+
+ private String bootclasspath;
+
+
+ private String extdirs;
+
+
+ private String encoding = "UTF-8";
+
+
+ private String target;
+
+
+ @SuppressWarnings("unchecked")
+ public DynaCode(String code) {
+ this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code));
+ }
+
+
+ public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) {
+ this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs);
+ }
+
+
+ public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) {
+ this.classpath = classpath;
+ this.parentClassLoader = parentClassLoader;
+ this.codeStrs = codeStrs;
+ this.loadClass = new HashMap<String, Class<?>>(codeStrs.size());
+ }
+
+
+ private static String extractClasspath(ClassLoader cl) {
+ StringBuffer buf = new StringBuffer();
+ while (cl != null) {
+ if (cl instanceof URLClassLoader) {
+ URL urls[] = ((URLClassLoader) cl).getURLs();
+ for (int i = 0; i < urls.length; i++) {
+ if (buf.length() > 0) {
+ buf.append(File.pathSeparatorChar);
+ }
+ String s = urls[i].getFile();
+ try {
+ s = URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ continue;
+ }
+ File f = new File(s);
+ buf.append(f.getAbsolutePath());
+ }
+ }
+ cl = cl.getParent();
+ }
+ return buf.toString();
+ }
+
+
+ public DynaCode(List<String> codeStrs) {
+ this(Thread.currentThread().getContextClassLoader(), codeStrs);
+ }
+
+ public static Class<?> compileAndLoadClass(final String className, final String javaSource)
+ throws Exception {
+ String classSimpleName = FilterAPI.simpleClassName(className);
+ String javaCode = javaSource;
+
+ final String newClassSimpleName = classSimpleName + System.currentTimeMillis();
+ String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName);
+
+ List<String> codes = new ArrayList<String>();
+ codes.add(newJavaCode);
+ DynaCode dc = new DynaCode(codes);
+ dc.compileAndLoadClass();
+ Map<String, Class<?>> map = dc.getLoadClass();
+
+ Class<?> clazz = map.get(getQualifiedName(newJavaCode));
+ return clazz;
+ }
+
+ public void compileAndLoadClass() throws Exception {
+ String[] sourceFiles = this.uploadSrcFile();
+ this.compile(sourceFiles);
+ this.loadClass(this.loadClass.keySet());
+ }
+
+ public Map<String, Class<?>> getLoadClass() {
+ return loadClass;
+ }
+
+ public static String getQualifiedName(String code) {
+ StringBuilder sb = new StringBuilder();
+ String className = getClassName(code);
+ if (StringUtils.isNotBlank(className)) {
+
+ String packageName = getPackageName(code);
+ if (StringUtils.isNotBlank(packageName)) {
+ sb.append(packageName).append(".");
+ }
+ sb.append(className);
+ }
+ return sb.toString();
+ }
+
+ private String[] uploadSrcFile() throws Exception {
+ List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size());
+ for (String code : codeStrs) {
+ if (StringUtils.isNotBlank(code)) {
+ String packageName = getPackageName(code);
+ String className = getClassName(code);
+ if (StringUtils.isNotBlank(className)) {
+ File srcFile = null;
+ BufferedWriter bufferWriter = null;
+ try {
+ if (StringUtils.isBlank(packageName)) {
+ File pathFile = new File(sourcePath);
+
+ if (!pathFile.exists()) {
+ if (!pathFile.mkdirs()) {
+ throw new RuntimeException("create PathFile Error!");
+ }
+ }
+ srcFile = new File(sourcePath + FILE_SP + className + ".java");
+ } else {
+ String srcPath = StringUtils.replace(packageName, ".", FILE_SP);
+ File pathFile = new File(sourcePath + FILE_SP + srcPath);
+
+ if (!pathFile.exists()) {
+ if (!pathFile.mkdirs()) {
+ throw new RuntimeException("create PathFile Error!");
+ }
+ }
+ srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java");
+ }
+ synchronized (loadClass) {
+ loadClass.put(getFullClassName(code), null);
+ }
+ if (null != srcFile) {
+ LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath());
+ srcFileAbsolutePaths.add(srcFile.getAbsolutePath());
+ srcFile.deleteOnExit();
+ }
+ OutputStreamWriter outputStreamWriter =
+ new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
+ bufferWriter = new BufferedWriter(outputStreamWriter);
+ for (String lineCode : code.split(LINE_SP)) {
+ bufferWriter.write(lineCode);
+ bufferWriter.newLine();
+ }
+ bufferWriter.flush();
+ } finally {
+ if (null != bufferWriter) {
+ bufferWriter.close();
+ }
+ }
+ }
+ }
+ }
+ return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]);
+ }
+
+ private void compile(String[] srcFiles) throws Exception {
+ String args[] = this.buildCompileJavacArgs(srcFiles);
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ if (compiler == null) {
+ throw new NullPointerException(
+ "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
+ }
+ int resultCode = compiler.run(null, null, err, args);
+ if (resultCode != 0) {
+ throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET));
+ }
+ }
+
+ private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException {
+ synchronized (loadClass) {
+ ClassLoader classLoader =
+ new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()},
+ parentClassLoader);
+ for (String key : classFullNames) {
+ Class<?> classz = classLoader.loadClass(key);
+ if (null != classz) {
+ loadClass.put(key, classz);
+ LOGGER.info("Dyna Load Java Class File OK:----> className: " + key);
+ } else {
+ LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key);
+ }
+ }
+ }
+ }
+
+ public static String getClassName(String code) {
+ String className = StringUtils.substringBefore(code, "{");
+ if (StringUtils.isBlank(className)) {
+ return className;
+ }
+ if (StringUtils.contains(code, " class ")) {
+ className = StringUtils.substringAfter(className, " class ");
+ if (StringUtils.contains(className, " extends ")) {
+ className = StringUtils.substringBefore(className, " extends ").trim();
+ } else if (StringUtils.contains(className, " implements ")) {
+ className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
+ } else {
+ className = StringUtils.trim(className);
+ }
+ } else if (StringUtils.contains(code, " interface ")) {
+ className = StringUtils.substringAfter(className, " interface ");
+ if (StringUtils.contains(className, " extends ")) {
+ className = StringUtils.substringBefore(className, " extends ").trim();
+ } else {
+ className = StringUtils.trim(className);
+ }
+ } else if (StringUtils.contains(code, " enum ")) {
+ className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
+ } else {
+ return StringUtils.EMPTY;
+ }
+ return className;
+ }
+
+ public static String getPackageName(String code) {
+ String packageName =
+ StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
+ return packageName;
+ }
+
+ public static String getFullClassName(String code) {
+ String packageName = getPackageName(code);
+ String className = getClassName(code);
+ return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
+ }
+
+ private String[] buildCompileJavacArgs(String srcFiles[]) {
+ ArrayList<String> args = new ArrayList<String>();
+ if (StringUtils.isNotBlank(classpath)) {
+ args.add("-classpath");
+ args.add(classpath);
+ }
+ if (StringUtils.isNotBlank(outPutClassPath)) {
+ args.add("-d");
+ args.add(outPutClassPath);
+ }
+ if (StringUtils.isNotBlank(sourcePath)) {
+ args.add("-sourcepath");
+ args.add(sourcePath);
+ }
+ if (StringUtils.isNotBlank(bootclasspath)) {
+ args.add("-bootclasspath");
+ args.add(bootclasspath);
+ }
+ if (StringUtils.isNotBlank(extdirs)) {
+ args.add("-extdirs");
+ args.add(extdirs);
+ }
+ if (StringUtils.isNotBlank(encoding)) {
+ args.add("-encoding");
+ args.add(encoding);
+ }
+ if (StringUtils.isNotBlank(target)) {
+ args.add("-target");
+ args.add(target);
+ }
+ for (int i = 0; i < srcFiles.length; i++) {
+ args.add(srcFiles[i]);
+ }
+ return args.toArray(new String[args.size()]);
+ }
+
+ public String getOutPutClassPath() {
+ return outPutClassPath;
+ }
+
+ public void setOutPutClassPath(String outPutClassPath) {
+ this.outPutClassPath = outPutClassPath;
+ }
+
+ public String getSourcePath() {
+ return sourcePath;
+ }
+
+ public void setSourcePath(String sourcePath) {
+ this.sourcePath = sourcePath;
+ }
+
+ public ClassLoader getParentClassLoader() {
+ return parentClassLoader;
+ }
+
+ public void setParentClassLoader(ClassLoader parentClassLoader) {
+ this.parentClassLoader = parentClassLoader;
+ }
+
+ public String getClasspath() {
+ return classpath;
+ }
+
+ public void setClasspath(String classpath) {
+ this.classpath = classpath;
+ }
+
+ public String getBootclasspath() {
+ return bootclasspath;
+ }
+
+ public void setBootclasspath(String bootclasspath) {
+ this.bootclasspath = bootclasspath;
+ }
+
+ public String getExtdirs() {
+ return extdirs;
+ }
+
+ public void setExtdirs(String extdirs) {
+ this.extdirs = extdirs;
+ }
+
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ public String getTarget() {
+ return target;
+ }
+
+ public void setTarget(String target) {
+ this.target = target;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
new file mode 100644
index 0000000..36d6b7e
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+public interface FilterClassFetchMethod {
+ public String fetch(final String topic, final String consumerGroup, final String className);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
new file mode 100644
index 0000000..d278fe3
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+import org.apache.rocketmq.common.filter.MessageFilter;
+
+
+public class FilterClassInfo {
+ private String className;
+ private int classCRC;
+ private MessageFilter messageFilter;
+
+
+ public int getClassCRC() {
+ return classCRC;
+ }
+
+
+ public void setClassCRC(int classCRC) {
+ this.classCRC = classCRC;
+ }
+
+
+ public MessageFilter getMessageFilter() {
+ return messageFilter;
+ }
+
+
+ public void setMessageFilter(MessageFilter messageFilter) {
+ this.messageFilter = messageFilter;
+ }
+
+
+ public String getClassName() {
+ return className;
+ }
+
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
new file mode 100644
index 0000000..3269852
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+public class FilterClassLoader extends ClassLoader {
+ public final Class<?> createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError {
+ return this.defineClass(name, b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
new file mode 100644
index 0000000..fab4d7d
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.MessageFilter;
+import org.apache.rocketmq.filtersrv.FiltersrvController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class FilterClassManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ private final Object compileLock = new Object();
+ private final FiltersrvController filtersrvController;
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
+ private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
+ new ConcurrentHashMap<String, FilterClassInfo>(128);
+ private FilterClassFetchMethod filterClassFetchMethod;
+
+
+ public FilterClassManager(FiltersrvController filtersrvController) {
+ this.filtersrvController = filtersrvController;
+ this.filterClassFetchMethod =
+ new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
+ .getFilterClassRepertoryUrl());
+ }
+
+
+ public void start() {
+ if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ fetchClassFromRemoteHost();
+ }
+ }, 1, 1, TimeUnit.MINUTES);
+ }
+ }
+
+ private void fetchClassFromRemoteHost() {
+ Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator();
+ while (it.hasNext()) {
+ try {
+ Entry<String, FilterClassInfo> next = it.next();
+ FilterClassInfo filterClassInfo = next.getValue();
+ String[] topicAndGroup = next.getKey().split("@");
+ String responseStr =
+ this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
+ filterClassInfo.getClassName());
+ byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
+ int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
+ if (classCRC != filterClassInfo.getClassCRC()) {
+ String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
+ Class<?> newClass =
+ DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
+ Object newInstance = newClass.newInstance();
+ filterClassInfo.setMessageFilter((MessageFilter) newInstance);
+ filterClassInfo.setClassCRC(classCRC);
+
+ log.info("fetch Remote class File OK, {} {}", next.getKey(),
+ filterClassInfo.getClassName());
+ }
+ } catch (Exception e) {
+ log.error("fetchClassFromRemoteHost Exception", e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+ public boolean registerFilterClass(final String consumerGroup, final String topic,
+ final String className, final int classCRC, final byte[] filterSourceBinary) {
+ final String key = buildKey(consumerGroup, topic);
+
+
+ boolean registerNew = false;
+ FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
+ if (null == filterClassInfoPrev) {
+ registerNew = true;
+ } else {
+ if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+ if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) {
+ registerNew = true;
+ }
+ }
+ }
+
+ if (registerNew) {
+ synchronized (this.compileLock) {
+ filterClassInfoPrev = this.filterClassTable.get(key);
+ if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
+ return true;
+ }
+
+ try {
+
+ FilterClassInfo filterClassInfoNew = new FilterClassInfo();
+ filterClassInfoNew.setClassName(className);
+ filterClassInfoNew.setClassCRC(0);
+ filterClassInfoNew.setMessageFilter(null);
+
+ if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+ String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
+ Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
+ Object newInstance = newClass.newInstance();
+ filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
+ filterClassInfoNew.setClassCRC(classCRC);
+ }
+
+ this.filterClassTable.put(key, filterClassInfoNew);
+ } catch (Throwable e) {
+ String info =
+ String
+ .format(
+ "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
+ consumerGroup, topic, className);
+ log.error(info, e);
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private static String buildKey(final String consumerGroup, final String topic) {
+ return topic + "@" + consumerGroup;
+ }
+
+ public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) {
+ return this.filterClassTable.get(buildKey(consumerGroup, topic));
+ }
+
+
+ public FilterClassFetchMethod getFilterClassFetchMethod() {
+ return filterClassFetchMethod;
+ }
+
+
+ public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) {
+ this.filterClassFetchMethod = filterClassFetchMethod;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
new file mode 100644
index 0000000..c8b1515
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.filter;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.HttpTinyClient;
+import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+ private final String url;
+
+
+ public HttpFilterClassFetchMethod(String url) {
+ this.url = url;
+ }
+
+
+ @Override
+ public String fetch(String topic, String consumerGroup, String className) {
+ String thisUrl = String.format("%s/%s.java", this.url, className);
+
+ try {
+ HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000);
+ if (200 == result.code) {
+ return result.content;
+ }
+ } catch (Exception e) {
+ log.error(
+ String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
new file mode 100644
index 0000000..5553952
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.filtersrv.processor;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.FilterContext;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.filtersrv.FiltersrvController;
+import org.apache.rocketmq.filtersrv.filter.FilterClassInfo;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.CommitLog;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultRequestProcessor implements NettyRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ private final FiltersrvController filtersrvController;
+
+
+ public DefaultRequestProcessor(FiltersrvController filtersrvController) {
+ this.filtersrvController = filtersrvController;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("receive request, {} {} {}",
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
+ }
+
+ switch (request.getCode()) {
+ case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
+ return registerMessageFilterClass(ctx, request);
+ case RequestCode.PULL_MESSAGE:
+ return pullMessageForward(ctx, request);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final RegisterMessageFilterClassRequestHeader requestHeader =
+ (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
+
+ try {
+ boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getClassName(),
+ requestHeader.getClassCRC(),
+ request.getBody());
+ if (!ok) {
+ throw new Exception("registerFilterClass error");
+ }
+ } catch (Exception e) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+ return response;
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+ final FilterContext filterContext = new FilterContext();
+ filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
+
+
+ response.setOpaque(request.getOpaque());
+
+ DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
+ final FilterClassInfo findFilterClass =
+ this.filtersrvController.getFilterClassManager()
+ .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ if (null == findFilterClass) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Find Filter class failed, not registered");
+ return response;
+ }
+
+ if (null == findFilterClass.getMessageFilter()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Find Filter class failed, registered but no class");
+ return response;
+ }
+
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+
+
+ MessageQueue mq = new MessageQueue();
+ mq.setTopic(requestHeader.getTopic());
+ mq.setQueueId(requestHeader.getQueueId());
+ mq.setBrokerName(this.filtersrvController.getBrokerName());
+ long offset = requestHeader.getQueueOffset();
+ int maxNums = requestHeader.getMaxMsgNums();
+
+ final PullCallback pullCallback = new PullCallback() {
+
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ responseHeader.setMaxOffset(pullResult.getMaxOffset());
+ responseHeader.setMinOffset(pullResult.getMinOffset());
+ responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
+ response.setRemark(null);
+
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ response.setCode(ResponseCode.SUCCESS);
+
+ List<MessageExt> msgListOK = new ArrayList<MessageExt>();
+ try {
+ for (MessageExt msg : pullResult.getMsgFoundList()) {
+ boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
+ if (match) {
+ msgListOK.add(msg);
+ }
+ }
+
+
+ if (!msgListOK.isEmpty()) {
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
+ return;
+ } else {
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ }
+ } catch (Throwable e) {
+ final String error =
+ String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ log.error(error, e);
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ return;
+ }
+
+ break;
+ case NO_MATCHED_MSG:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case NO_NEW_MSG:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_ILLEGAL:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ break;
+ default:
+ break;
+ }
+
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ }
+
+
+ @Override
+ public void onException(Throwable e) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ return;
+ }
+ };
+
+ pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
+
+ return null;
+ }
+
+ private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
+ final List<MessageExt> msgList) {
+ if (null != msgList) {
+ ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
+ int bodyTotalSize = 0;
+ for (int i = 0; i < msgList.size(); i++) {
+ try {
+ msgBufferList[i] = messageToByteBuffer(msgList.get(i));
+ bodyTotalSize += msgBufferList[i].capacity();
+ } catch (Exception e) {
+ log.error("messageToByteBuffer UnsupportedEncodingException", e);
+ }
+ }
+
+ ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
+ for (ByteBuffer bb : msgBufferList) {
+ bb.flip();
+ body.put(bb);
+ }
+
+ response.setBody(body.array());
+
+
+ this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size());
+
+ this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize);
+ }
+
+ try {
+ ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause());
+ log.error(response.toString());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("FilterServer process request over, but response failed", e);
+ log.error(response.toString());
+ }
+ }
+
+ private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException {
+ int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag());
+ if (msg.getBody() != null) {
+ if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) {
+ byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel());
+ if (data != null) {
+ msg.setBody(data);
+ sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+ }
+ }
+ }
+
+ final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0;
+ byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET);
+ final int topicLength = topicData.length;
+ String properties = MessageDecoder.messageProperties2String(msg.getProperties());
+ byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET);
+ final int propertiesLength = propertiesData.length;
+ final int msgLen = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + bodyLength // 14 BODY
+ + 1 + topicLength // 15 TOPIC
+ + 2 + propertiesLength // 16 propertiesLength
+ + 0;
+
+ ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
+ final MessageExt msgInner = msg;
+
+ // 1 TOTALSIZE
+ msgStoreItemMemory.putInt(msgLen);
+ // 2 MAGICCODE
+ msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody()));
+ // 4 QUEUEID
+ msgStoreItemMemory.putInt(msgInner.getQueueId());
+ // 5 FLAG
+ msgStoreItemMemory.putInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET
+ msgStoreItemMemory.putLong(msgInner.getQueueOffset());
+ // 7 PHYSICALOFFSET
+ msgStoreItemMemory.putLong(msgInner.getCommitLogOffset());
+ // 8 SYSFLAG
+ msgStoreItemMemory.putInt(sysFlag);
+ // 9 BORNTIMESTAMP
+ msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+ // 10 BORNHOST
+ msgStoreItemMemory.put(msgInner.getBornHostBytes());
+ // 11 STORETIMESTAMP
+ msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+ // 13 RECONSUMETIMES
+ msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ msgStoreItemMemory.putInt(bodyLength);
+ if (bodyLength > 0)
+ msgStoreItemMemory.put(msgInner.getBody());
+ // 16 TOPIC
+ msgStoreItemMemory.put((byte) topicLength);
+ msgStoreItemMemory.put(topicData);
+ // 17 PROPERTIES
+ msgStoreItemMemory.putShort((short) propertiesLength);
+ if (propertiesLength > 0)
+ msgStoreItemMemory.put(propertiesData);
+
+ return msgStoreItemMemory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
new file mode 100644
index 0000000..8665fbd
--- /dev/null
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filtersrv.stats;
+
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class FilterServerStatsManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
+
+ // ConsumerGroup Get Nums
+ private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
+ this.scheduledExecutorService, log);
+
+ // ConsumerGroup Get Size
+ private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
+ this.scheduledExecutorService, log);
+
+
+ public FilterServerStatsManager() {
+ }
+
+
+ public void start() {
+ }
+
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+
+ public void incGroupGetNums(final String group, final String topic, final int incValue) {
+ this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
+ }
+
+
+ public void incGroupGetSize(final String group, final String topic, final int incValue) {
+ this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 3494f8f..2ec2f5f 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -18,7 +18,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>
@@ -35,11 +35,11 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
deleted file mode 100644
index 82f2622..0000000
--- a/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.namesrv;
-
-import com.alibaba.rocketmq.common.Configuration;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.namesrv.NamesrvConfig;
-import com.alibaba.rocketmq.namesrv.kvconfig.KVConfigManager;
-import com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
-import com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor;
-import com.alibaba.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
-import com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager;
-import com.alibaba.rocketmq.remoting.RemotingServer;
-import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class NamesrvController {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
-
- private final NamesrvConfig namesrvConfig;
-
- private final NettyServerConfig nettyServerConfig;
-
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "NSScheduledThread"));
- private final KVConfigManager kvConfigManager;
- private final RouteInfoManager routeInfoManager;
-
- private RemotingServer remotingServer;
-
- private BrokerHousekeepingService brokerHousekeepingService;
-
- private ExecutorService remotingExecutor;
-
- private Configuration configuration;
-
-
- public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
- this.namesrvConfig = namesrvConfig;
- this.nettyServerConfig = nettyServerConfig;
- this.kvConfigManager = new KVConfigManager(this);
- this.routeInfoManager = new RouteInfoManager();
- this.brokerHousekeepingService = new BrokerHousekeepingService(this);
- this.configuration = new Configuration(
- log,
- this.namesrvConfig, this.nettyServerConfig
- );
- this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
- }
-
-
- public boolean initialize() {
-
- this.kvConfigManager.load();
-
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
-
-
- this.remotingExecutor =
- Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
-
- this.registerProcessor();
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- NamesrvController.this.routeInfoManager.scanNotActiveBroker();
- }
- }, 5, 10, TimeUnit.SECONDS);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- NamesrvController.this.kvConfigManager.printAllPeriodically();
- }
- }, 1, 10, TimeUnit.MINUTES);
-
- return true;
- }
-
-
- private void registerProcessor() {
- if (namesrvConfig.isClusterTest()) {
-
- this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
- this.remotingExecutor);
- } else {
-
- this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
- }
- }
-
-
- public void start() throws Exception {
- this.remotingServer.start();
- }
-
-
- public void shutdown() {
- this.remotingServer.shutdown();
- this.remotingExecutor.shutdown();
- this.scheduledExecutorService.shutdown();
- }
-
-
- public NamesrvConfig getNamesrvConfig() {
- return namesrvConfig;
- }
-
-
- public NettyServerConfig getNettyServerConfig() {
- return nettyServerConfig;
- }
-
-
- public KVConfigManager getKvConfigManager() {
- return kvConfigManager;
- }
-
-
- public RouteInfoManager getRouteInfoManager() {
- return routeInfoManager;
- }
-
-
- public RemotingServer getRemotingServer() {
- return remotingServer;
- }
-
-
- public void setRemotingServer(RemotingServer remotingServer) {
- this.remotingServer = remotingServer;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-}