You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:07 UTC
[76/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
index cda7952..1e13d39 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
@@ -6,28 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.example.simple;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
public class RandomAsyncCommit {
private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable =
- new ConcurrentHashMap<MessageQueue, CachedQueue>();
-
+ new ConcurrentHashMap<MessageQueue, CachedQueue>();
public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) {
CachedQueue cachedQueue = this.mqCachedTable.get(mq);
@@ -40,7 +37,6 @@ public class RandomAsyncCommit {
}
}
-
public void removeMessage(final MessageQueue mq, long offset) {
CachedQueue cachedQueue = this.mqCachedTable.get(mq);
if (null != cachedQueue) {
@@ -48,7 +44,6 @@ public class RandomAsyncCommit {
}
}
-
public long commitableOffset(final MessageQueue mq) {
CachedQueue cachedQueue = this.mqCachedTable.get(mq);
if (null != cachedQueue) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
index 0304a63..8787fa8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
@@ -33,14 +33,14 @@ public class TestProducer {
try {
{
Message msg = new Message("TopicTest1",
- "TagA",
- "key113",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ "TagA",
+ "key113",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
QueryResult queryMessage =
- producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
+ producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
for (MessageExt m : queryMessage.getMessageList()) {
System.out.printf("%s%n", m);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
index fea93a8..1beed71 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
@@ -16,17 +16,14 @@
*/
package org.apache.rocketmq.example.transaction;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
-
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.printf("server checking TrMsg " + msg.toString() + "%n");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
index eb787fd..b767a4a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
@@ -6,26 +6,24 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.example.transaction;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1);
-
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5a868c6..1609a81 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.transaction;
+import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
@@ -23,8 +24,6 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import java.io.UnsupportedEncodingException;
-
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
@@ -35,13 +34,13 @@ public class TransactionProducer {
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
- String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+ String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
- new Message("TopicTest", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ new Message("TopicTest", tags[i % tags.length], "KEY" + i,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.printf("%s%n", sendResult);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/resources/MessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java
index 3ff3f48..83ca00e 100644
--- a/example/src/main/resources/MessageFilterImpl.java
+++ b/example/src/main/resources/MessageFilterImpl.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.example.filter;
@@ -20,7 +20,6 @@ package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;
-
public class MessageFilterImpl implements MessageFilter {
@Override
@@ -29,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter {
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 10) == 0) && //
- (id > 100)) {
+ (id > 100)) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/pom.xml
----------------------------------------------------------------------
diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml
index bebd10a..cf5388d 100644
--- a/filtersrv/pom.xml
+++ b/filtersrv/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
index bd16e0d..32b8bad 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
@@ -30,43 +30,38 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public class FilterServerOuterAPI {
private final RemotingClient remotingClient;
-
public FilterServerOuterAPI() {
this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
}
-
public void start() {
this.remotingClient.start();
}
-
public void shutdown() {
this.remotingClient.shutdown();
}
-
public RegisterFilterServerResponseHeader registerFilterServerToBroker(
- final String brokerAddr,
- final String filterServerAddr
+ final String brokerAddr,
+ final String filterServerAddr
) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader();
requestHeader.setFilterServerAddr(filterServerAddr);
RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
+ RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterFilterServerResponseHeader responseHeader =
- (RegisterFilterServerResponseHeader) response
- .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
+ (RegisterFilterServerResponseHeader)response
+ .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
return responseHeader;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
index ec0381d..ee2ebee 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv;
@@ -21,14 +21,13 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-
public class FiltersrvConfig {
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
- System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
- System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ System.getenv(MixAll.NAMESRV_ADDR_ENV));
private String connectWhichBroker = "127.0.0.1:10911";
private String filterServerIP = RemotingUtil.getLocalAddress();
@@ -36,122 +35,98 @@ public class FiltersrvConfig {
private int compressMsgBodyOverHowmuch = 1024 * 8;
private int zipCompressLevel = 5;
-
private boolean clientUploadFilterClassEnable = true;
-
private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass";
private int fsServerAsyncSemaphoreValue = 2048;
private int fsServerCallbackExecutorThreads = 64;
private int fsServerWorkerThreads = 64;
-
public String getRocketmqHome() {
return rocketmqHome;
}
-
public void setRocketmqHome(String rocketmqHome) {
this.rocketmqHome = rocketmqHome;
}
-
public String getNamesrvAddr() {
return namesrvAddr;
}
-
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
-
public String getConnectWhichBroker() {
return connectWhichBroker;
}
-
public void setConnectWhichBroker(String connectWhichBroker) {
this.connectWhichBroker = connectWhichBroker;
}
-
public String getFilterServerIP() {
return filterServerIP;
}
-
public void setFilterServerIP(String filterServerIP) {
this.filterServerIP = filterServerIP;
}
-
public int getCompressMsgBodyOverHowmuch() {
return compressMsgBodyOverHowmuch;
}
-
public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
}
-
public int getZipCompressLevel() {
return zipCompressLevel;
}
-
public void setZipCompressLevel(int zipCompressLevel) {
this.zipCompressLevel = zipCompressLevel;
}
-
public boolean isClientUploadFilterClassEnable() {
return clientUploadFilterClassEnable;
}
-
public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) {
this.clientUploadFilterClassEnable = clientUploadFilterClassEnable;
}
-
public String getFilterClassRepertoryUrl() {
return filterClassRepertoryUrl;
}
-
public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) {
this.filterClassRepertoryUrl = filterClassRepertoryUrl;
}
-
public int getFsServerAsyncSemaphoreValue() {
return fsServerAsyncSemaphoreValue;
}
-
public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) {
this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue;
}
-
public int getFsServerCallbackExecutorThreads() {
return fsServerCallbackExecutorThreads;
}
-
public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) {
this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads;
}
-
public int getFsServerWorkerThreads() {
return fsServerWorkerThreads;
}
-
public void setFsServerWorkerThreads(int fsServerWorkerThreads) {
this.fsServerWorkerThreads = fsServerWorkerThreads;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
index ca136e0..c46b613 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.filtersrv;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class FiltersrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
@@ -47,10 +45,10 @@ public class FiltersrvController {
private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();
private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
- MixAll.FILTERSRV_CONSUMER_GROUP);
+ MixAll.FILTERSRV_CONSUMER_GROUP);
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();
private RemotingServer remotingServer;
@@ -58,29 +56,24 @@ public class FiltersrvController {
private ExecutorService remotingExecutor;
private volatile String brokerName = null;
-
public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
this.filtersrvConfig = filtersrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.filterClassManager = new FilterClassManager(this);
}
-
public boolean initialize() {
MixAll.printObjectProperties(log, this.filtersrvConfig);
-
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
-
this.remotingExecutor =
- Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactoryImpl("RemotingExecutorThread_"));
+ Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
+ new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -90,9 +83,9 @@ public class FiltersrvController {
}, 3, 10, TimeUnit.SECONDS);
this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
- .getBrokerSuspendMaxTimeMillis() - 1000);
+ .getBrokerSuspendMaxTimeMillis() - 1000);
this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
- .getConsumerTimeoutMillisWhenSuspend() - 1000);
+ .getConsumerTimeoutMillisWhenSuspend() - 1000);
this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
@@ -102,26 +95,26 @@ public class FiltersrvController {
private void registerProcessor() {
this.remotingServer
- .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+ .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
public void registerFilterServerToBroker() {
try {
RegisterFilterServerResponseHeader responseHeader =
- this.filterServerOuterAPI.registerFilterServerToBroker(
- this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
+ this.filterServerOuterAPI.registerFilterServerToBroker(
+ this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
- .setDefaultBrokerId(responseHeader.getBrokerId());
+ .setDefaultBrokerId(responseHeader.getBrokerId());
if (null == this.brokerName) {
this.brokerName = responseHeader.getBrokerName();
}
log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
- this.localAddr(),
- this.filtersrvConfig.getConnectWhichBroker(),
- responseHeader.getBrokerName(),
- responseHeader.getBrokerId());
+ this.localAddr(),
+ this.filtersrvConfig.getConnectWhichBroker(),
+ responseHeader.getBrokerName(),
+ responseHeader.getBrokerId());
} catch (Exception e) {
log.warn("register filter server Exception", e);
@@ -132,7 +125,7 @@ public class FiltersrvController {
public String localAddr() {
return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),
- this.remotingServer.localListenPort());
+ this.remotingServer.localListenPort());
}
public void start() throws Exception {
@@ -140,12 +133,11 @@ public class FiltersrvController {
this.remotingServer.start();
this.filterServerOuterAPI.start();
this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
- .setConnectBrokerByUser(true);
+ .setConnectBrokerByUser(true);
this.filterClassManager.start();
this.filterServerStatsManager.start();
}
-
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
@@ -156,67 +148,54 @@ public class FiltersrvController {
this.filterServerStatsManager.shutdown();
}
-
public RemotingServer getRemotingServer() {
return remotingServer;
}
-
public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}
-
public ExecutorService getRemotingExecutor() {
return remotingExecutor;
}
-
public void setRemotingExecutor(ExecutorService remotingExecutor) {
this.remotingExecutor = remotingExecutor;
}
-
public FiltersrvConfig getFiltersrvConfig() {
return filtersrvConfig;
}
-
public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
-
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
-
public FilterServerOuterAPI getFilterServerOuterAPI() {
return filterServerOuterAPI;
}
-
public FilterClassManager getFilterClassManager() {
return filterClassManager;
}
-
public DefaultMQPullConsumer getDefaultMQPullConsumer() {
return defaultMQPullConsumer;
}
-
public String getBrokerName() {
return brokerName;
}
-
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
-
public FilterServerStatsManager getFilterServerStatsManager() {
return filterServerStatsManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
index 461c79c..f239caf 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.filtersrv;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -25,20 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class FiltersrvStartup {
public static Logger log;
@@ -65,12 +63,10 @@ public class FiltersrvStartup {
public static FiltersrvController createController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 65535;
}
-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 1024;
}
@@ -78,8 +74,8 @@ public class FiltersrvStartup {
try {
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine =
- ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
- new PosixParser());
+ ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
@@ -108,7 +104,7 @@ public class FiltersrvStartup {
nettyServerConfig.setListenPort(0);
nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());
nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig
- .getFsServerCallbackExecutorThreads());
+ .getFsServerCallbackExecutorThreads());
nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());
if (commandLine.hasOption('p')) {
@@ -120,11 +116,11 @@ public class FiltersrvStartup {
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);
if (null == filtersrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
- + " variable in your environment to match the location of the RocketMQ installation%n");
+ + " variable in your environment to match the location of the RocketMQ installation%n");
System.exit(-2);
}
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
@@ -132,7 +128,7 @@ public class FiltersrvStartup {
log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
final FiltersrvController controller =
- new FiltersrvController(filtersrvConfig, nettyServerConfig);
+ new FiltersrvController(filtersrvConfig, nettyServerConfig);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
index fd95685..11102d0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
@@ -6,35 +6,43 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
-import java.io.*;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.util.*;
-
-
public class DynaCode {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
@@ -43,46 +51,35 @@ public class DynaCode {
private static final String LINE_SP = System.getProperty("line.separator");
private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP
- + UtilAll.getPid();
+ + UtilAll.getPid();
private String outPutClassPath = sourcePath;
-
private ClassLoader parentClassLoader;
-
private List<String> codeStrs;
-
private Map<String/* fullClassName */, Class<?>/* class */> loadClass;
-
private String classpath;
-
private String bootclasspath;
-
private String extdirs;
-
private String encoding = "UTF-8";
-
private String target;
-
@SuppressWarnings("unchecked")
public DynaCode(String code) {
this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code));
}
-
public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) {
this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs);
}
-
public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) {
this.classpath = classpath;
this.parentClassLoader = parentClassLoader;
@@ -90,12 +87,15 @@ public class DynaCode {
this.loadClass = new HashMap<String, Class<?>>(codeStrs.size());
}
+ public DynaCode(List<String> codeStrs) {
+ this(Thread.currentThread().getContextClassLoader(), codeStrs);
+ }
private static String extractClasspath(ClassLoader cl) {
StringBuffer buf = new StringBuffer();
while (cl != null) {
if (cl instanceof URLClassLoader) {
- URL urls[] = ((URLClassLoader) cl).getURLs();
+ URL urls[] = ((URLClassLoader)cl).getURLs();
for (int i = 0; i < urls.length; i++) {
if (buf.length() > 0) {
buf.append(File.pathSeparatorChar);
@@ -115,13 +115,8 @@ public class DynaCode {
return buf.toString();
}
-
- public DynaCode(List<String> codeStrs) {
- this(Thread.currentThread().getContextClassLoader(), codeStrs);
- }
-
public static Class<?> compileAndLoadClass(final String className, final String javaSource)
- throws Exception {
+ throws Exception {
String classSimpleName = FilterAPI.simpleClassName(className);
String javaCode = javaSource;
@@ -138,16 +133,6 @@ public class DynaCode {
return clazz;
}
- public void compileAndLoadClass() throws Exception {
- String[] sourceFiles = this.uploadSrcFile();
- this.compile(sourceFiles);
- this.loadClass(this.loadClass.keySet());
- }
-
- public Map<String, Class<?>> getLoadClass() {
- return loadClass;
- }
-
public static String getQualifiedName(String code) {
StringBuilder sb = new StringBuilder();
String className = getClassName(code);
@@ -162,6 +147,57 @@ public class DynaCode {
return sb.toString();
}
+ public static String getClassName(String code) {
+ String className = StringUtils.substringBefore(code, "{");
+ if (StringUtils.isBlank(className)) {
+ return className;
+ }
+ if (StringUtils.contains(code, " class ")) {
+ className = StringUtils.substringAfter(className, " class ");
+ if (StringUtils.contains(className, " extends ")) {
+ className = StringUtils.substringBefore(className, " extends ").trim();
+ } else if (StringUtils.contains(className, " implements ")) {
+ className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
+ } else {
+ className = StringUtils.trim(className);
+ }
+ } else if (StringUtils.contains(code, " interface ")) {
+ className = StringUtils.substringAfter(className, " interface ");
+ if (StringUtils.contains(className, " extends ")) {
+ className = StringUtils.substringBefore(className, " extends ").trim();
+ } else {
+ className = StringUtils.trim(className);
+ }
+ } else if (StringUtils.contains(code, " enum ")) {
+ className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
+ } else {
+ return StringUtils.EMPTY;
+ }
+ return className;
+ }
+
+ public static String getPackageName(String code) {
+ String packageName =
+ StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
+ return packageName;
+ }
+
+ public static String getFullClassName(String code) {
+ String packageName = getPackageName(code);
+ String className = getClassName(code);
+ return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
+ }
+
+ public void compileAndLoadClass() throws Exception {
+ String[] sourceFiles = this.uploadSrcFile();
+ this.compile(sourceFiles);
+ this.loadClass(this.loadClass.keySet());
+ }
+
+ public Map<String, Class<?>> getLoadClass() {
+ return loadClass;
+ }
+
private String[] uploadSrcFile() throws Exception {
List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size());
for (String code : codeStrs) {
@@ -201,7 +237,7 @@ public class DynaCode {
srcFile.deleteOnExit();
}
OutputStreamWriter outputStreamWriter =
- new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
+ new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
bufferWriter = new BufferedWriter(outputStreamWriter);
for (String lineCode : code.split(LINE_SP)) {
bufferWriter.write(lineCode);
@@ -225,7 +261,7 @@ public class DynaCode {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new NullPointerException(
- "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
+ "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
}
int resultCode = compiler.run(null, null, err, args);
if (resultCode != 0) {
@@ -236,8 +272,8 @@ public class DynaCode {
private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException {
synchronized (loadClass) {
ClassLoader classLoader =
- new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()},
- parentClassLoader);
+ new URLClassLoader(new URL[] {new File(outPutClassPath).toURI().toURL()},
+ parentClassLoader);
for (String key : classFullNames) {
Class<?> classz = classLoader.loadClass(key);
if (null != classz) {
@@ -250,47 +286,6 @@ public class DynaCode {
}
}
- public static String getClassName(String code) {
- String className = StringUtils.substringBefore(code, "{");
- if (StringUtils.isBlank(className)) {
- return className;
- }
- if (StringUtils.contains(code, " class ")) {
- className = StringUtils.substringAfter(className, " class ");
- if (StringUtils.contains(className, " extends ")) {
- className = StringUtils.substringBefore(className, " extends ").trim();
- } else if (StringUtils.contains(className, " implements ")) {
- className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
- } else {
- className = StringUtils.trim(className);
- }
- } else if (StringUtils.contains(code, " interface ")) {
- className = StringUtils.substringAfter(className, " interface ");
- if (StringUtils.contains(className, " extends ")) {
- className = StringUtils.substringBefore(className, " extends ").trim();
- } else {
- className = StringUtils.trim(className);
- }
- } else if (StringUtils.contains(code, " enum ")) {
- className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
- } else {
- return StringUtils.EMPTY;
- }
- return className;
- }
-
- public static String getPackageName(String code) {
- String packageName =
- StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
- return packageName;
- }
-
- public static String getFullClassName(String code) {
- String packageName = getPackageName(code);
- String className = getClassName(code);
- return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
- }
-
private String[] buildCompileJavacArgs(String srcFiles[]) {
ArrayList<String> args = new ArrayList<String>();
if (StringUtils.isNotBlank(classpath)) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
index 36d6b7e..89f1883 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
index d278fe3..1cb0e96 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
@@ -6,51 +6,44 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
import org.apache.rocketmq.common.filter.MessageFilter;
-
public class FilterClassInfo {
private String className;
private int classCRC;
private MessageFilter messageFilter;
-
public int getClassCRC() {
return classCRC;
}
-
public void setClassCRC(int classCRC) {
this.classCRC = classCRC;
}
-
public MessageFilter getMessageFilter() {
return messageFilter;
}
-
public void setMessageFilter(MessageFilter messageFilter) {
this.messageFilter = messageFilter;
}
-
public String getClassName() {
return className;
}
-
public void setClassName(String className) {
this.className = className;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
index 3269852..32f5ac2 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
index fab4d7d..66389e0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
@@ -6,17 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
@@ -26,14 +32,6 @@ import org.apache.rocketmq.filtersrv.FiltersrvController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class FilterClassManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
@@ -41,19 +39,21 @@ public class FilterClassManager {
private final FiltersrvController filtersrvController;
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
- new ConcurrentHashMap<String, FilterClassInfo>(128);
+ new ConcurrentHashMap<String, FilterClassInfo>(128);
private FilterClassFetchMethod filterClassFetchMethod;
-
public FilterClassManager(FiltersrvController filtersrvController) {
this.filtersrvController = filtersrvController;
this.filterClassFetchMethod =
- new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
- .getFilterClassRepertoryUrl());
+ new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
+ .getFilterClassRepertoryUrl());
}
+ private static String buildKey(final String consumerGroup, final String topic) {
+ return topic + "@" + consumerGroup;
+ }
public void start() {
if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
@@ -75,20 +75,20 @@ public class FilterClassManager {
FilterClassInfo filterClassInfo = next.getValue();
String[] topicAndGroup = next.getKey().split("@");
String responseStr =
- this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
- filterClassInfo.getClassName());
+ this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
+ filterClassInfo.getClassName());
byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
if (classCRC != filterClassInfo.getClassCRC()) {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass =
- DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
+ DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
Object newInstance = newClass.newInstance();
- filterClassInfo.setMessageFilter((MessageFilter) newInstance);
+ filterClassInfo.setMessageFilter((MessageFilter)newInstance);
filterClassInfo.setClassCRC(classCRC);
log.info("fetch Remote class File OK, {} {}", next.getKey(),
- filterClassInfo.getClassName());
+ filterClassInfo.getClassName());
}
} catch (Exception e) {
log.error("fetchClassFromRemoteHost Exception", e);
@@ -101,10 +101,9 @@ public class FilterClassManager {
}
public boolean registerFilterClass(final String consumerGroup, final String topic,
- final String className, final int classCRC, final byte[] filterSourceBinary) {
+ final String className, final int classCRC, final byte[] filterSourceBinary) {
final String key = buildKey(consumerGroup, topic);
-
boolean registerNew = false;
FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
if (null == filterClassInfoPrev) {
@@ -135,17 +134,17 @@ public class FilterClassManager {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
Object newInstance = newClass.newInstance();
- filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
+ filterClassInfoNew.setMessageFilter((MessageFilter)newInstance);
filterClassInfoNew.setClassCRC(classCRC);
}
this.filterClassTable.put(key, filterClassInfoNew);
} catch (Throwable e) {
String info =
- String
- .format(
- "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
- consumerGroup, topic, className);
+ String
+ .format(
+ "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
+ consumerGroup, topic, className);
log.error(info, e);
return false;
}
@@ -155,20 +154,14 @@ public class FilterClassManager {
return true;
}
- private static String buildKey(final String consumerGroup, final String topic) {
- return topic + "@" + consumerGroup;
- }
-
public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) {
return this.filterClassTable.get(buildKey(consumerGroup, topic));
}
-
public FilterClassFetchMethod getFilterClassFetchMethod() {
return filterClassFetchMethod;
}
-
public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) {
this.filterClassFetchMethod = filterClassFetchMethod;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
index c8b1515..99bfad0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.filter;
@@ -23,17 +23,14 @@ import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final String url;
-
public HttpFilterClassFetchMethod(String url) {
this.url = url;
}
-
@Override
public String fetch(String topic, String consumerGroup, String className) {
String thisUrl = String.format("%s/%s.java", this.url, className);
@@ -45,7 +42,7 @@ public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
}
} catch (Exception e) {
log.error(
- String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
+ String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
index f2c98ae..1d56ac1 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.filtersrv.processor;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -39,36 +46,25 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.CommitLog;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final FiltersrvController filtersrvController;
-
public DefaultRequestProcessor(FiltersrvController filtersrvController) {
this.filtersrvController = filtersrvController;
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
- request.getCode(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request);
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
}
switch (request.getCode()) {
@@ -89,14 +85,14 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final RegisterMessageFilterClassRequestHeader requestHeader =
- (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
+ (RegisterMessageFilterClassRequestHeader)request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
try {
boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(),
- requestHeader.getClassName(),
- requestHeader.getClassCRC(),
- request.getBody());
+ requestHeader.getTopic(),
+ requestHeader.getClassName(),
+ requestHeader.getClassCRC(),
+ request.getBody());
if (!ok) {
throw new Exception("registerFilterClass error");
}
@@ -113,20 +109,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
- final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
- (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+ (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
final FilterContext filterContext = new FilterContext();
filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
-
response.setOpaque(request.getOpaque());
DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
final FilterClassInfo findFilterClass =
- this.filtersrvController.getFilterClassManager()
- .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ this.filtersrvController.getFilterClassManager()
+ .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
if (null == findFilterClass) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, not registered");
@@ -141,7 +136,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
@@ -171,7 +165,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
}
-
if (!msgListOK.isEmpty()) {
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
return;
@@ -180,8 +173,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
} catch (Throwable e) {
final String error =
- String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
- requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.error(error, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -207,7 +200,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
}
-
@Override
public void onException(Throwable e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -223,7 +215,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
- final List<MessageExt> msgList) {
+ final List<MessageExt> msgList) {
if (null != msgList) {
ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
int bodyTotalSize = 0;
@@ -244,7 +236,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
response.setBody(body.array());
-
this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size());
this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize);
@@ -285,23 +276,23 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET);
final int propertiesLength = propertiesData.length;
final int msgLen = 4 // 1 TOTALSIZE
- + 4 // 2 MAGICCODE
- + 4 // 3 BODYCRC
- + 4 // 4 QUEUEID
- + 4 // 5 FLAG
- + 8 // 6 QUEUEOFFSET
- + 8 // 7 PHYSICALOFFSET
- + 4 // 8 SYSFLAG
- + 8 // 9 BORNTIMESTAMP
- + 8 // 10 BORNHOST
- + 8 // 11 STORETIMESTAMP
- + 8 // 12 STOREHOSTADDRESS
- + 4 // 13 RECONSUMETIMES
- + 8 // 14 Prepared Transaction Offset
- + 4 + bodyLength // 14 BODY
- + 1 + topicLength // 15 TOPIC
- + 2 + propertiesLength // 16 propertiesLength
- + 0;
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + bodyLength // 14 BODY
+ + 1 + topicLength // 15 TOPIC
+ + 2 + propertiesLength // 16 propertiesLength
+ + 0;
ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
@@ -340,10 +331,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
if (bodyLength > 0)
msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
- msgStoreItemMemory.put((byte) topicLength);
+ msgStoreItemMemory.put((byte)topicLength);
msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
- msgStoreItemMemory.putShort((short) propertiesLength);
+ msgStoreItemMemory.putShort((short)propertiesLength);
if (propertiesLength > 0)
msgStoreItemMemory.put(propertiesData);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
index 8665fbd..4f44e99 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
@@ -6,59 +6,52 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.filtersrv.stats;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-
public class FilterServerStatsManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
// ConsumerGroup Get Nums
private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
- this.scheduledExecutorService, log);
+ this.scheduledExecutorService, log);
// ConsumerGroup Get Size
private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
- this.scheduledExecutorService, log);
-
+ this.scheduledExecutorService, log);
public FilterServerStatsManager() {
}
-
public void start() {
}
-
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
-
public void incGroupGetNums(final String group, final String topic, final int incValue) {
this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
}
-
public void incGroupGetSize(final String group, final String topic, final int incValue) {
this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 2ec2f5f..27a1c84 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 4c286e0..b212adb 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -6,16 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class NamesrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -45,7 +43,7 @@ public class NamesrvController {
private final NettyServerConfig nettyServerConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "NSScheduledThread"));
+ "NSScheduledThread"));
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
@@ -57,7 +55,6 @@ public class NamesrvController {
private Configuration configuration;
-
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
@@ -65,26 +62,23 @@ public class NamesrvController {
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
- log,
- this.namesrvConfig, this.nettyServerConfig
+ log,
+ this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
-
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
-
this.remotingExecutor =
- Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
+ Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -104,56 +98,47 @@ public class NamesrvController {
return true;
}
-
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
- this.remotingExecutor);
+ this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
-
public void start() throws Exception {
this.remotingServer.start();
}
-
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
}
-
public NamesrvConfig getNamesrvConfig() {
return namesrvConfig;
}
-
public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
-
public KVConfigManager getKvConfigManager() {
return kvConfigManager;
}
-
public RouteInfoManager getRouteInfoManager() {
return routeInfoManager;
}
-
public RemotingServer getRemotingServer() {
return remotingServer;
}
-
public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index be824cd..0eb9a52 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.namesrv;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -26,20 +35,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class NamesrvStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
@@ -51,12 +49,10 @@ public class NamesrvStartup {
public static NamesrvController main0(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 4096;
}
-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 4096;
}
@@ -66,14 +62,13 @@ public class NamesrvStartup {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine =
- ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
- new PosixParser());
+ ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
-
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
@@ -93,7 +88,6 @@ public class NamesrvStartup {
}
}
-
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
@@ -104,22 +98,20 @@ public class NamesrvStartup {
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
- + " variable in your environment to match the location of the RocketMQ installation%n");
+ + " variable in your environment to match the location of the RocketMQ installation%n");
System.exit(-2);
}
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
-
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
-
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
@@ -135,7 +127,6 @@ public class NamesrvStartup {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
-
@Override
public void run() {
synchronized (this) {
@@ -151,7 +142,6 @@ public class NamesrvStartup {
}
}, "ShutdownHook"));
-
controller.start();
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();