You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:32 UTC

[25/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
index cda7952..1e13d39 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java
@@ -6,28 +6,25 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.example.simple;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 
 public class RandomAsyncCommit {
     private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable =
-            new ConcurrentHashMap<MessageQueue, CachedQueue>();
-
+        new ConcurrentHashMap<MessageQueue, CachedQueue>();
 
     public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) {
         CachedQueue cachedQueue = this.mqCachedTable.get(mq);
@@ -40,7 +37,6 @@ public class RandomAsyncCommit {
         }
     }
 
-
     public void removeMessage(final MessageQueue mq, long offset) {
         CachedQueue cachedQueue = this.mqCachedTable.get(mq);
         if (null != cachedQueue) {
@@ -48,7 +44,6 @@ public class RandomAsyncCommit {
         }
     }
 
-
     public long commitableOffset(final MessageQueue mq) {
         CachedQueue cachedQueue = this.mqCachedTable.get(mq);
         if (null != cachedQueue) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
index 0304a63..8787fa8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java
@@ -33,14 +33,14 @@ public class TestProducer {
             try {
                 {
                     Message msg = new Message("TopicTest1",
-                            "TagA",
-                            "key113",
-                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                        "TagA",
+                        "key113",
+                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                     SendResult sendResult = producer.send(msg);
                     System.out.printf("%s%n", sendResult);
 
                     QueryResult queryMessage =
-                            producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
+                        producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
                     for (MessageExt m : queryMessage.getMessageList()) {
                         System.out.printf("%s%n", m);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
index fea93a8..1beed71 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java
@@ -16,17 +16,14 @@
  */
 package org.apache.rocketmq.example.transaction;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.common.message.MessageExt;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class TransactionCheckListenerImpl implements TransactionCheckListener {
     private AtomicInteger transactionIndex = new AtomicInteger(0);
 
-
     @Override
     public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
         System.out.printf("server checking TrMsg " + msg.toString() + "%n");

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
index eb787fd..b767a4a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java
@@ -6,26 +6,24 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.example.transaction;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.message.Message;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class TransactionExecuterImpl implements LocalTransactionExecuter {
     private AtomicInteger transactionIndex = new AtomicInteger(1);
 
-
     @Override
     public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
         int value = transactionIndex.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5a868c6..1609a81 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.example.transaction;
 
+import java.io.UnsupportedEncodingException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
@@ -23,8 +24,6 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
-import java.io.UnsupportedEncodingException;
-
 public class TransactionProducer {
     public static void main(String[] args) throws MQClientException, InterruptedException {
         TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
@@ -35,13 +34,13 @@ public class TransactionProducer {
         producer.setTransactionCheckListener(transactionCheckListener);
         producer.start();
 
-        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
         TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
         for (int i = 0; i < 100; i++) {
             try {
                 Message msg =
-                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
-                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
+                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                 System.out.printf("%s%n", sendResult);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/resources/MessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java
index 3ff3f48..83ca00e 100644
--- a/example/src/main/resources/MessageFilterImpl.java
+++ b/example/src/main/resources/MessageFilterImpl.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.example.filter;
@@ -20,7 +20,6 @@ package org.apache.rocketmq.example.filter;
 import org.apache.rocketmq.common.filter.MessageFilter;
 import org.apache.rocketmq.common.message.MessageExt;
 
-
 public class MessageFilterImpl implements MessageFilter {
 
     @Override
@@ -29,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter {
         if (property != null) {
             int id = Integer.parseInt(property);
             if (((id % 10) == 0) && //
-                    (id > 100)) {
+                (id > 100)) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/pom.xml
----------------------------------------------------------------------
diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml
index bebd10a..cf5388d 100644
--- a/filtersrv/pom.xml
+++ b/filtersrv/pom.xml
@@ -15,7 +15,7 @@
    limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
index bd16e0d..32b8bad 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java
@@ -30,43 +30,38 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-
 public class FilterServerOuterAPI {
     private final RemotingClient remotingClient;
 
-
     public FilterServerOuterAPI() {
         this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
     }
 
-
     public void start() {
         this.remotingClient.start();
     }
 
-
     public void shutdown() {
         this.remotingClient.shutdown();
     }
 
-
     public RegisterFilterServerResponseHeader registerFilterServerToBroker(
-            final String brokerAddr,
-            final String filterServerAddr
+        final String brokerAddr,
+        final String filterServerAddr
     ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingTimeoutException, InterruptedException, MQBrokerException {
         RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader();
         requestHeader.setFilterServerAddr(filterServerAddr);
         RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
+            RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 RegisterFilterServerResponseHeader responseHeader =
-                        (RegisterFilterServerResponseHeader) response
-                                .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
+                    (RegisterFilterServerResponseHeader)response
+                        .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
 
                 return responseHeader;
             }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
index ec0381d..ee2ebee 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv;
@@ -21,14 +21,13 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
-
 public class FiltersrvConfig {
     private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
-            System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+        System.getenv(MixAll.ROCKETMQ_HOME_ENV));
 
     @ImportantField
     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
-            System.getenv(MixAll.NAMESRV_ADDR_ENV));
+        System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
     private String connectWhichBroker = "127.0.0.1:10911";
     private String filterServerIP = RemotingUtil.getLocalAddress();
@@ -36,122 +35,98 @@ public class FiltersrvConfig {
     private int compressMsgBodyOverHowmuch = 1024 * 8;
     private int zipCompressLevel = 5;
 
-
     private boolean clientUploadFilterClassEnable = true;
 
-
     private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass";
 
     private int fsServerAsyncSemaphoreValue = 2048;
     private int fsServerCallbackExecutorThreads = 64;
     private int fsServerWorkerThreads = 64;
 
-
     public String getRocketmqHome() {
         return rocketmqHome;
     }
 
-
     public void setRocketmqHome(String rocketmqHome) {
         this.rocketmqHome = rocketmqHome;
     }
 
-
     public String getNamesrvAddr() {
         return namesrvAddr;
     }
 
-
     public void setNamesrvAddr(String namesrvAddr) {
         this.namesrvAddr = namesrvAddr;
     }
 
-
     public String getConnectWhichBroker() {
         return connectWhichBroker;
     }
 
-
     public void setConnectWhichBroker(String connectWhichBroker) {
         this.connectWhichBroker = connectWhichBroker;
     }
 
-
     public String getFilterServerIP() {
         return filterServerIP;
     }
 
-
     public void setFilterServerIP(String filterServerIP) {
         this.filterServerIP = filterServerIP;
     }
 
-
     public int getCompressMsgBodyOverHowmuch() {
         return compressMsgBodyOverHowmuch;
     }
 
-
     public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
         this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
     }
 
-
     public int getZipCompressLevel() {
         return zipCompressLevel;
     }
 
-
     public void setZipCompressLevel(int zipCompressLevel) {
         this.zipCompressLevel = zipCompressLevel;
     }
 
-
     public boolean isClientUploadFilterClassEnable() {
         return clientUploadFilterClassEnable;
     }
 
-
     public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) {
         this.clientUploadFilterClassEnable = clientUploadFilterClassEnable;
     }
 
-
     public String getFilterClassRepertoryUrl() {
         return filterClassRepertoryUrl;
     }
 
-
     public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) {
         this.filterClassRepertoryUrl = filterClassRepertoryUrl;
     }
 
-
     public int getFsServerAsyncSemaphoreValue() {
         return fsServerAsyncSemaphoreValue;
     }
 
-
     public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) {
         this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue;
     }
 
-
     public int getFsServerCallbackExecutorThreads() {
         return fsServerCallbackExecutorThreads;
     }
 
-
     public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) {
         this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads;
     }
 
-
     public int getFsServerWorkerThreads() {
         return fsServerWorkerThreads;
     }
 
-
     public void setFsServerWorkerThreads(int fsServerWorkerThreads) {
         this.fsServerWorkerThreads = fsServerWorkerThreads;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
index ca136e0..c46b613 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java
@@ -16,6 +16,10 @@
  */
 package org.apache.rocketmq.filtersrv;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
 public class FiltersrvController {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
 
@@ -47,10 +45,10 @@ public class FiltersrvController {
 
     private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();
     private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
-            MixAll.FILTERSRV_CONSUMER_GROUP);
+        MixAll.FILTERSRV_CONSUMER_GROUP);
 
     private final ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
     private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();
 
     private RemotingServer remotingServer;
@@ -58,29 +56,24 @@ public class FiltersrvController {
     private ExecutorService remotingExecutor;
     private volatile String brokerName = null;
 
-
     public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
         this.filtersrvConfig = filtersrvConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.filterClassManager = new FilterClassManager(this);
     }
 
-
     public boolean initialize() {
 
         MixAll.printObjectProperties(log, this.filtersrvConfig);
 
-
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
 
-
         this.remotingExecutor =
-                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
-                        new ThreadFactoryImpl("RemotingExecutorThread_"));
+            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
+                new ThreadFactoryImpl("RemotingExecutorThread_"));
 
         this.registerProcessor();
 
-
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
@@ -90,9 +83,9 @@ public class FiltersrvController {
         }, 3, 10, TimeUnit.SECONDS);
 
         this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
-                .getBrokerSuspendMaxTimeMillis() - 1000);
+            .getBrokerSuspendMaxTimeMillis() - 1000);
         this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
-                .getConsumerTimeoutMillisWhenSuspend() - 1000);
+            .getConsumerTimeoutMillisWhenSuspend() - 1000);
 
         this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
         this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
@@ -102,26 +95,26 @@ public class FiltersrvController {
 
     private void registerProcessor() {
         this.remotingServer
-                .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+            .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
     }
 
     public void registerFilterServerToBroker() {
         try {
             RegisterFilterServerResponseHeader responseHeader =
-                    this.filterServerOuterAPI.registerFilterServerToBroker(
-                            this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
+                this.filterServerOuterAPI.registerFilterServerToBroker(
+                    this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
             this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
-                    .setDefaultBrokerId(responseHeader.getBrokerId());
+                .setDefaultBrokerId(responseHeader.getBrokerId());
 
             if (null == this.brokerName) {
                 this.brokerName = responseHeader.getBrokerName();
             }
 
             log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
-                    this.localAddr(),
-                    this.filtersrvConfig.getConnectWhichBroker(),
-                    responseHeader.getBrokerName(),
-                    responseHeader.getBrokerId());
+                this.localAddr(),
+                this.filtersrvConfig.getConnectWhichBroker(),
+                responseHeader.getBrokerName(),
+                responseHeader.getBrokerId());
         } catch (Exception e) {
             log.warn("register filter server Exception", e);
 
@@ -132,7 +125,7 @@ public class FiltersrvController {
 
     public String localAddr() {
         return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),
-                this.remotingServer.localListenPort());
+            this.remotingServer.localListenPort());
     }
 
     public void start() throws Exception {
@@ -140,12 +133,11 @@ public class FiltersrvController {
         this.remotingServer.start();
         this.filterServerOuterAPI.start();
         this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
-                .setConnectBrokerByUser(true);
+            .setConnectBrokerByUser(true);
         this.filterClassManager.start();
         this.filterServerStatsManager.start();
     }
 
-
     public void shutdown() {
         this.remotingServer.shutdown();
         this.remotingExecutor.shutdown();
@@ -156,67 +148,54 @@ public class FiltersrvController {
         this.filterServerStatsManager.shutdown();
     }
 
-
     public RemotingServer getRemotingServer() {
         return remotingServer;
     }
 
-
     public void setRemotingServer(RemotingServer remotingServer) {
         this.remotingServer = remotingServer;
     }
 
-
     public ExecutorService getRemotingExecutor() {
         return remotingExecutor;
     }
 
-
     public void setRemotingExecutor(ExecutorService remotingExecutor) {
         this.remotingExecutor = remotingExecutor;
     }
 
-
     public FiltersrvConfig getFiltersrvConfig() {
         return filtersrvConfig;
     }
 
-
     public NettyServerConfig getNettyServerConfig() {
         return nettyServerConfig;
     }
 
-
     public ScheduledExecutorService getScheduledExecutorService() {
         return scheduledExecutorService;
     }
 
-
     public FilterServerOuterAPI getFilterServerOuterAPI() {
         return filterServerOuterAPI;
     }
 
-
     public FilterClassManager getFilterClassManager() {
         return filterClassManager;
     }
 
-
     public DefaultMQPullConsumer getDefaultMQPullConsumer() {
         return defaultMQPullConsumer;
     }
 
-
     public String getBrokerName() {
         return brokerName;
     }
 
-
     public void setBrokerName(String brokerName) {
         this.brokerName = brokerName;
     }
 
-
     public FilterServerStatsManager getFilterServerStatsManager() {
         return filterServerStatsManager;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
index 461c79c..f239caf 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.filtersrv;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -25,20 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class FiltersrvStartup {
     public static Logger log;
 
@@ -65,12 +63,10 @@ public class FiltersrvStartup {
     public static FiltersrvController createController(String[] args) {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
 
-
         if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
             NettySystemConfig.socketSndbufSize = 65535;
         }
 
-
         if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
             NettySystemConfig.socketRcvbufSize = 1024;
         }
@@ -78,8 +74,8 @@ public class FiltersrvStartup {
         try {
             Options options = ServerUtil.buildCommandlineOptions(new Options());
             final CommandLine commandLine =
-                    ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
-                            new PosixParser());
+                ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
+                    new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
                 return null;
@@ -108,7 +104,7 @@ public class FiltersrvStartup {
             nettyServerConfig.setListenPort(0);
             nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());
             nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig
-                    .getFsServerCallbackExecutorThreads());
+                .getFsServerCallbackExecutorThreads());
             nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());
 
             if (commandLine.hasOption('p')) {
@@ -120,11 +116,11 @@ public class FiltersrvStartup {
             MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);
             if (null == filtersrvConfig.getRocketmqHome()) {
                 System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
-                        + " variable in your environment to match the location of the RocketMQ installation%n");
+                    + " variable in your environment to match the location of the RocketMQ installation%n");
                 System.exit(-2);
             }
 
-            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
             JoranConfigurator configurator = new JoranConfigurator();
             configurator.setContext(lc);
             lc.reset();
@@ -132,7 +128,7 @@ public class FiltersrvStartup {
             log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
 
             final FiltersrvController controller =
-                    new FiltersrvController(filtersrvConfig, nettyServerConfig);
+                new FiltersrvController(filtersrvConfig, nettyServerConfig);
             boolean initResult = controller.initialize();
             if (!initResult) {
                 controller.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
index fd95685..11102d0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
@@ -6,35 +6,43 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
-import java.io.*;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.util.*;
-
-
 public class DynaCode {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
 
@@ -43,46 +51,35 @@ public class DynaCode {
     private static final String LINE_SP = System.getProperty("line.separator");
 
     private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP
-            + UtilAll.getPid();
+        + UtilAll.getPid();
 
     private String outPutClassPath = sourcePath;
 
-
     private ClassLoader parentClassLoader;
 
-
     private List<String> codeStrs;
 
-
     private Map<String/* fullClassName */, Class<?>/* class */> loadClass;
 
-
     private String classpath;
 
-
     private String bootclasspath;
 
-
     private String extdirs;
 
-
     private String encoding = "UTF-8";
 
-
     private String target;
 
-
     @SuppressWarnings("unchecked")
     public DynaCode(String code) {
         this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code));
     }
 
-
     public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) {
         this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs);
     }
 
-
     public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) {
         this.classpath = classpath;
         this.parentClassLoader = parentClassLoader;
@@ -90,12 +87,15 @@ public class DynaCode {
         this.loadClass = new HashMap<String, Class<?>>(codeStrs.size());
     }
 
+    public DynaCode(List<String> codeStrs) {
+        this(Thread.currentThread().getContextClassLoader(), codeStrs);
+    }
 
     private static String extractClasspath(ClassLoader cl) {
         StringBuffer buf = new StringBuffer();
         while (cl != null) {
             if (cl instanceof URLClassLoader) {
-                URL urls[] = ((URLClassLoader) cl).getURLs();
+                URL urls[] = ((URLClassLoader)cl).getURLs();
                 for (int i = 0; i < urls.length; i++) {
                     if (buf.length() > 0) {
                         buf.append(File.pathSeparatorChar);
@@ -115,13 +115,8 @@ public class DynaCode {
         return buf.toString();
     }
 
-
-    public DynaCode(List<String> codeStrs) {
-        this(Thread.currentThread().getContextClassLoader(), codeStrs);
-    }
-
     public static Class<?> compileAndLoadClass(final String className, final String javaSource)
-            throws Exception {
+        throws Exception {
         String classSimpleName = FilterAPI.simpleClassName(className);
         String javaCode = javaSource;
 
@@ -138,16 +133,6 @@ public class DynaCode {
         return clazz;
     }
 
-    public void compileAndLoadClass() throws Exception {
-        String[] sourceFiles = this.uploadSrcFile();
-        this.compile(sourceFiles);
-        this.loadClass(this.loadClass.keySet());
-    }
-
-    public Map<String, Class<?>> getLoadClass() {
-        return loadClass;
-    }
-
     public static String getQualifiedName(String code) {
         StringBuilder sb = new StringBuilder();
         String className = getClassName(code);
@@ -162,6 +147,57 @@ public class DynaCode {
         return sb.toString();
     }
 
+    public static String getClassName(String code) {
+        String className = StringUtils.substringBefore(code, "{");
+        if (StringUtils.isBlank(className)) {
+            return className;
+        }
+        if (StringUtils.contains(code, " class ")) {
+            className = StringUtils.substringAfter(className, " class ");
+            if (StringUtils.contains(className, " extends ")) {
+                className = StringUtils.substringBefore(className, " extends ").trim();
+            } else if (StringUtils.contains(className, " implements ")) {
+                className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
+            } else {
+                className = StringUtils.trim(className);
+            }
+        } else if (StringUtils.contains(code, " interface ")) {
+            className = StringUtils.substringAfter(className, " interface ");
+            if (StringUtils.contains(className, " extends ")) {
+                className = StringUtils.substringBefore(className, " extends ").trim();
+            } else {
+                className = StringUtils.trim(className);
+            }
+        } else if (StringUtils.contains(code, " enum ")) {
+            className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
+        } else {
+            return StringUtils.EMPTY;
+        }
+        return className;
+    }
+
+    public static String getPackageName(String code) {
+        String packageName =
+            StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
+        return packageName;
+    }
+
+    public static String getFullClassName(String code) {
+        String packageName = getPackageName(code);
+        String className = getClassName(code);
+        return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
+    }
+
+    public void compileAndLoadClass() throws Exception {
+        String[] sourceFiles = this.uploadSrcFile();
+        this.compile(sourceFiles);
+        this.loadClass(this.loadClass.keySet());
+    }
+
+    public Map<String, Class<?>> getLoadClass() {
+        return loadClass;
+    }
+
     private String[] uploadSrcFile() throws Exception {
         List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size());
         for (String code : codeStrs) {
@@ -201,7 +237,7 @@ public class DynaCode {
                             srcFile.deleteOnExit();
                         }
                         OutputStreamWriter outputStreamWriter =
-                                new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
+                            new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
                         bufferWriter = new BufferedWriter(outputStreamWriter);
                         for (String lineCode : code.split(LINE_SP)) {
                             bufferWriter.write(lineCode);
@@ -225,7 +261,7 @@ public class DynaCode {
         JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
         if (compiler == null) {
             throw new NullPointerException(
-                    "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
+                "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
         }
         int resultCode = compiler.run(null, null, err, args);
         if (resultCode != 0) {
@@ -236,8 +272,8 @@ public class DynaCode {
     private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException {
         synchronized (loadClass) {
             ClassLoader classLoader =
-                    new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()},
-                            parentClassLoader);
+                new URLClassLoader(new URL[] {new File(outPutClassPath).toURI().toURL()},
+                    parentClassLoader);
             for (String key : classFullNames) {
                 Class<?> classz = classLoader.loadClass(key);
                 if (null != classz) {
@@ -250,47 +286,6 @@ public class DynaCode {
         }
     }
 
-    public static String getClassName(String code) {
-        String className = StringUtils.substringBefore(code, "{");
-        if (StringUtils.isBlank(className)) {
-            return className;
-        }
-        if (StringUtils.contains(code, " class ")) {
-            className = StringUtils.substringAfter(className, " class ");
-            if (StringUtils.contains(className, " extends ")) {
-                className = StringUtils.substringBefore(className, " extends ").trim();
-            } else if (StringUtils.contains(className, " implements ")) {
-                className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
-            } else {
-                className = StringUtils.trim(className);
-            }
-        } else if (StringUtils.contains(code, " interface ")) {
-            className = StringUtils.substringAfter(className, " interface ");
-            if (StringUtils.contains(className, " extends ")) {
-                className = StringUtils.substringBefore(className, " extends ").trim();
-            } else {
-                className = StringUtils.trim(className);
-            }
-        } else if (StringUtils.contains(code, " enum ")) {
-            className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
-        } else {
-            return StringUtils.EMPTY;
-        }
-        return className;
-    }
-
-    public static String getPackageName(String code) {
-        String packageName =
-                StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
-        return packageName;
-    }
-
-    public static String getFullClassName(String code) {
-        String packageName = getPackageName(code);
-        String className = getClassName(code);
-        return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
-    }
-
     private String[] buildCompileJavacArgs(String srcFiles[]) {
         ArrayList<String> args = new ArrayList<String>();
         if (StringUtils.isNotBlank(classpath)) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
index 36d6b7e..89f1883 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
index d278fe3..1cb0e96 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java
@@ -6,51 +6,44 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;
 
 import org.apache.rocketmq.common.filter.MessageFilter;
 
-
 public class FilterClassInfo {
     private String className;
     private int classCRC;
     private MessageFilter messageFilter;
 
-
     public int getClassCRC() {
         return classCRC;
     }
 
-
     public void setClassCRC(int classCRC) {
         this.classCRC = classCRC;
     }
 
-
     public MessageFilter getMessageFilter() {
         return messageFilter;
     }
 
-
     public void setMessageFilter(MessageFilter messageFilter) {
         this.messageFilter = messageFilter;
     }
 
-
     public String getClassName() {
         return className;
     }
 
-
     public void setClassName(String className) {
         this.className = className;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
index 3269852..32f5ac2 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
index fab4d7d..66389e0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
@@ -6,17 +6,23 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;
 
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
@@ -26,14 +32,6 @@ import org.apache.rocketmq.filtersrv.FiltersrvController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
 public class FilterClassManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
 
@@ -41,19 +39,21 @@ public class FilterClassManager {
     private final FiltersrvController filtersrvController;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
     private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
-            new ConcurrentHashMap<String, FilterClassInfo>(128);
+        new ConcurrentHashMap<String, FilterClassInfo>(128);
     private FilterClassFetchMethod filterClassFetchMethod;
 
-
     public FilterClassManager(FiltersrvController filtersrvController) {
         this.filtersrvController = filtersrvController;
         this.filterClassFetchMethod =
-                new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
-                        .getFilterClassRepertoryUrl());
+            new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
+                .getFilterClassRepertoryUrl());
     }
 
+    private static String buildKey(final String consumerGroup, final String topic) {
+        return topic + "@" + consumerGroup;
+    }
 
     public void start() {
         if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
@@ -75,20 +75,20 @@ public class FilterClassManager {
                 FilterClassInfo filterClassInfo = next.getValue();
                 String[] topicAndGroup = next.getKey().split("@");
                 String responseStr =
-                        this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
-                                filterClassInfo.getClassName());
+                    this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
+                        filterClassInfo.getClassName());
                 byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
                 int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
                 if (classCRC != filterClassInfo.getClassCRC()) {
                     String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
                     Class<?> newClass =
-                            DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
+                        DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
                     Object newInstance = newClass.newInstance();
-                    filterClassInfo.setMessageFilter((MessageFilter) newInstance);
+                    filterClassInfo.setMessageFilter((MessageFilter)newInstance);
                     filterClassInfo.setClassCRC(classCRC);
 
                     log.info("fetch Remote class File OK, {} {}", next.getKey(),
-                            filterClassInfo.getClassName());
+                        filterClassInfo.getClassName());
                 }
             } catch (Exception e) {
                 log.error("fetchClassFromRemoteHost Exception", e);
@@ -101,10 +101,9 @@ public class FilterClassManager {
     }
 
     public boolean registerFilterClass(final String consumerGroup, final String topic,
-                                       final String className, final int classCRC, final byte[] filterSourceBinary) {
+        final String className, final int classCRC, final byte[] filterSourceBinary) {
         final String key = buildKey(consumerGroup, topic);
 
-
         boolean registerNew = false;
         FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
         if (null == filterClassInfoPrev) {
@@ -135,17 +134,17 @@ public class FilterClassManager {
                         String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
                         Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
                         Object newInstance = newClass.newInstance();
-                        filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
+                        filterClassInfoNew.setMessageFilter((MessageFilter)newInstance);
                         filterClassInfoNew.setClassCRC(classCRC);
                     }
 
                     this.filterClassTable.put(key, filterClassInfoNew);
                 } catch (Throwable e) {
                     String info =
-                            String
-                                    .format(
-                                            "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
-                                            consumerGroup, topic, className);
+                        String
+                            .format(
+                                "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
+                                consumerGroup, topic, className);
                     log.error(info, e);
                     return false;
                 }
@@ -155,20 +154,14 @@ public class FilterClassManager {
         return true;
     }
 
-    private static String buildKey(final String consumerGroup, final String topic) {
-        return topic + "@" + consumerGroup;
-    }
-
     public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) {
         return this.filterClassTable.get(buildKey(consumerGroup, topic));
     }
 
-
     public FilterClassFetchMethod getFilterClassFetchMethod() {
         return filterClassFetchMethod;
     }
 
-
     public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) {
         this.filterClassFetchMethod = filterClassFetchMethod;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
index c8b1515..99bfad0 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.filter;
@@ -23,17 +23,14 @@ import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
     private final String url;
 
-
     public HttpFilterClassFetchMethod(String url) {
         this.url = url;
     }
 
-
     @Override
     public String fetch(String topic, String consumerGroup, String className) {
         String thisUrl = String.format("%s/%s.java", this.url, className);
@@ -45,7 +42,7 @@ public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
             }
         } catch (Exception e) {
             log.error(
-                    String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
+                String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
index f2c98ae..1d56ac1 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.filtersrv.processor;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -39,36 +46,25 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.CommitLog;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-
 public class DefaultRequestProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
 
     private final FiltersrvController filtersrvController;
 
-
     public DefaultRequestProcessor(FiltersrvController filtersrvController) {
         this.filtersrvController = filtersrvController;
     }
 
-
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("receive request, {} {} {}",
-                    request.getCode(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                    request);
+                request.getCode(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                request);
         }
 
         switch (request.getCode()) {
@@ -89,14 +85,14 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
     private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final RegisterMessageFilterClassRequestHeader requestHeader =
-                (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
+            (RegisterMessageFilterClassRequestHeader)request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
 
         try {
             boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
-                    requestHeader.getTopic(),
-                    requestHeader.getClassName(),
-                    requestHeader.getClassCRC(),
-                    request.getBody());
+                requestHeader.getTopic(),
+                requestHeader.getClassName(),
+                requestHeader.getClassCRC(),
+                request.getBody());
             if (!ok) {
                 throw new Exception("registerFilterClass error");
             }
@@ -113,20 +109,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
 
     private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
         final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
-        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
         final PullMessageRequestHeader requestHeader =
-                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+            (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
 
         final FilterContext filterContext = new FilterContext();
         filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
 
-
         response.setOpaque(request.getOpaque());
 
         DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
         final FilterClassInfo findFilterClass =
-                this.filtersrvController.getFilterClassManager()
-                        .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
+            this.filtersrvController.getFilterClassManager()
+                .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
         if (null == findFilterClass) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("Find Filter class failed, not registered");
@@ -141,7 +136,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
 
         responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
 
-
         MessageQueue mq = new MessageQueue();
         mq.setTopic(requestHeader.getTopic());
         mq.setQueueId(requestHeader.getQueueId());
@@ -171,7 +165,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                                 }
                             }
 
-
                             if (!msgListOK.isEmpty()) {
                                 returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
                                 return;
@@ -180,8 +173,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                             }
                         } catch (Throwable e) {
                             final String error =
-                                    String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
-                                            requestHeader.getConsumerGroup(), requestHeader.getTopic());
+                                String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
+                                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
                             log.error(error, e);
 
                             response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -207,7 +200,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                 returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
             }
 
-
             @Override
             public void onException(Throwable e) {
                 response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -223,7 +215,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
     }
 
     private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
-                                final List<MessageExt> msgList) {
+        final List<MessageExt> msgList) {
         if (null != msgList) {
             ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
             int bodyTotalSize = 0;
@@ -244,7 +236,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
 
             response.setBody(body.array());
 
-
             this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size());
 
             this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize);
@@ -285,23 +276,23 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET);
         final int propertiesLength = propertiesData.length;
         final int msgLen = 4 // 1 TOTALSIZE
-                + 4 // 2 MAGICCODE
-                + 4 // 3 BODYCRC
-                + 4 // 4 QUEUEID
-                + 4 // 5 FLAG
-                + 8 // 6 QUEUEOFFSET
-                + 8 // 7 PHYSICALOFFSET
-                + 4 // 8 SYSFLAG
-                + 8 // 9 BORNTIMESTAMP
-                + 8 // 10 BORNHOST
-                + 8 // 11 STORETIMESTAMP
-                + 8 // 12 STOREHOSTADDRESS
-                + 4 // 13 RECONSUMETIMES
-                + 8 // 14 Prepared Transaction Offset
-                + 4 + bodyLength // 14 BODY
-                + 1 + topicLength // 15 TOPIC
-                + 2 + propertiesLength // 16 propertiesLength
-                + 0;
+            + 4 // 2 MAGICCODE
+            + 4 // 3 BODYCRC
+            + 4 // 4 QUEUEID
+            + 4 // 5 FLAG
+            + 8 // 6 QUEUEOFFSET
+            + 8 // 7 PHYSICALOFFSET
+            + 4 // 8 SYSFLAG
+            + 8 // 9 BORNTIMESTAMP
+            + 8 // 10 BORNHOST
+            + 8 // 11 STORETIMESTAMP
+            + 8 // 12 STOREHOSTADDRESS
+            + 4 // 13 RECONSUMETIMES
+            + 8 // 14 Prepared Transaction Offset
+            + 4 + bodyLength // 14 BODY
+            + 1 + topicLength // 15 TOPIC
+            + 2 + propertiesLength // 16 propertiesLength
+            + 0;
 
         ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
 
@@ -340,10 +331,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         if (bodyLength > 0)
             msgStoreItemMemory.put(msgInner.getBody());
         // 16 TOPIC
-        msgStoreItemMemory.put((byte) topicLength);
+        msgStoreItemMemory.put((byte)topicLength);
         msgStoreItemMemory.put(topicData);
         // 17 PROPERTIES
-        msgStoreItemMemory.putShort((short) propertiesLength);
+        msgStoreItemMemory.putShort((short)propertiesLength);
         if (propertiesLength > 0)
             msgStoreItemMemory.put(propertiesData);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
index 8665fbd..4f44e99 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java
@@ -6,59 +6,52 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.filtersrv.stats;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.stats.StatsItemSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-
 public class FilterServerStatsManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
     private final ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
 
     // ConsumerGroup Get Nums
     private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
-            this.scheduledExecutorService, log);
+        this.scheduledExecutorService, log);
 
     // ConsumerGroup Get Size
     private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
-            this.scheduledExecutorService, log);
-
+        this.scheduledExecutorService, log);
 
     public FilterServerStatsManager() {
     }
 
-
     public void start() {
     }
 
-
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
     }
 
-
     public void incGroupGetNums(final String group, final String topic, final int incValue) {
         this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
     }
 
-
     public void incGroupGetSize(final String group, final String topic, final int incValue) {
         this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 2ec2f5f..27a1c84 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -15,7 +15,7 @@
    limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 4c286e0..b212adb 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -6,16 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.namesrv;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
 public class NamesrvController {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
@@ -45,7 +43,7 @@ public class NamesrvController {
     private final NettyServerConfig nettyServerConfig;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "NSScheduledThread"));
+        "NSScheduledThread"));
     private final KVConfigManager kvConfigManager;
     private final RouteInfoManager routeInfoManager;
 
@@ -57,7 +55,6 @@ public class NamesrvController {
 
     private Configuration configuration;
 
-
     public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
         this.namesrvConfig = namesrvConfig;
         this.nettyServerConfig = nettyServerConfig;
@@ -65,26 +62,23 @@ public class NamesrvController {
         this.routeInfoManager = new RouteInfoManager();
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
         this.configuration = new Configuration(
-                log,
-                this.namesrvConfig, this.nettyServerConfig
+            log,
+            this.namesrvConfig, this.nettyServerConfig
         );
         this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
     }
 
-
     public boolean initialize() {
 
         this.kvConfigManager.load();
 
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
 
-
         this.remotingExecutor =
-                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
+            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
 
         this.registerProcessor();
 
-
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
@@ -104,56 +98,47 @@ public class NamesrvController {
         return true;
     }
 
-
     private void registerProcessor() {
         if (namesrvConfig.isClusterTest()) {
 
             this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
-                    this.remotingExecutor);
+                this.remotingExecutor);
         } else {
 
             this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
         }
     }
 
-
     public void start() throws Exception {
         this.remotingServer.start();
     }
 
-
     public void shutdown() {
         this.remotingServer.shutdown();
         this.remotingExecutor.shutdown();
         this.scheduledExecutorService.shutdown();
     }
 
-
     public NamesrvConfig getNamesrvConfig() {
         return namesrvConfig;
     }
 
-
     public NettyServerConfig getNettyServerConfig() {
         return nettyServerConfig;
     }
 
-
     public KVConfigManager getKvConfigManager() {
         return kvConfigManager;
     }
 
-
     public RouteInfoManager getRouteInfoManager() {
         return routeInfoManager;
     }
 
-
     public RemotingServer getRemotingServer() {
         return remotingServer;
     }
 
-
     public void setRemotingServer(RemotingServer remotingServer) {
         this.remotingServer = remotingServer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index be824cd..0eb9a52 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.namesrv;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -26,20 +35,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class NamesrvStartup {
     public static Properties properties = null;
     public static CommandLine commandLine = null;
@@ -51,12 +49,10 @@ public class NamesrvStartup {
     public static NamesrvController main0(String[] args) {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
 
-
         if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
             NettySystemConfig.socketSndbufSize = 4096;
         }
 
-
         if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
             NettySystemConfig.socketRcvbufSize = 4096;
         }
@@ -66,14 +62,13 @@ public class NamesrvStartup {
 
             Options options = ServerUtil.buildCommandlineOptions(new Options());
             commandLine =
-                    ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
-                            new PosixParser());
+                ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
+                    new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
                 return null;
             }
 
-
             final NamesrvConfig namesrvConfig = new NamesrvConfig();
             final NettyServerConfig nettyServerConfig = new NettyServerConfig();
             nettyServerConfig.setListenPort(9876);
@@ -93,7 +88,6 @@ public class NamesrvStartup {
                 }
             }
 
-
             if (commandLine.hasOption('p')) {
                 MixAll.printObjectProperties(null, namesrvConfig);
                 MixAll.printObjectProperties(null, nettyServerConfig);
@@ -104,22 +98,20 @@ public class NamesrvStartup {
 
             if (null == namesrvConfig.getRocketmqHome()) {
                 System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
-                        + " variable in your environment to match the location of the RocketMQ installation%n");
+                    + " variable in your environment to match the location of the RocketMQ installation%n");
                 System.exit(-2);
             }
 
-            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
             JoranConfigurator configurator = new JoranConfigurator();
             configurator.setContext(lc);
             lc.reset();
             configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
             final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
-
             MixAll.printObjectProperties(log, namesrvConfig);
             MixAll.printObjectProperties(log, nettyServerConfig);
 
-
             final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
 
             // remember all configs to prevent discard
@@ -135,7 +127,6 @@ public class NamesrvStartup {
                 private volatile boolean hasShutdown = false;
                 private AtomicInteger shutdownTimes = new AtomicInteger(0);
 
-
                 @Override
                 public void run() {
                     synchronized (this) {
@@ -151,7 +142,6 @@ public class NamesrvStartup {
                 }
             }, "ShutdownHook"));
 
-
             controller.start();
 
             String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();