You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:34 UTC

[17/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
new file mode 100644
index 0000000..25d668c
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+public class PullConsumerTest {
+    public static void main(String[] args) throws MQClientException {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+        consumer.start();
+
+        try {
+            MessageQueue mq = new MessageQueue();
+            mq.setQueueId(0);
+            mq.setTopic("TopicTest3");
+            mq.setBrokerName("vivedeMacBook-Pro.local");
+
+            long offset = 26;
+
+            long beginTime = System.currentTimeMillis();
+            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
+            System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
+            System.out.printf("%s%n", pullResult);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        consumer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
new file mode 100644
index 0000000..0c86cf8
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.consumer.MQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.consumer.PullTaskCallback;
+import com.alibaba.rocketmq.client.consumer.PullTaskContext;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+
+
+public class PullScheduleService {
+
+    public static void main(String[] args) throws MQClientException {
+        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
+
+        scheduleService.setMessageModel(MessageModel.CLUSTERING);
+        scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
+
+            @Override
+            public void doPullTask(MessageQueue mq, PullTaskContext context) {
+                MQPullConsumer consumer = context.getPullConsumer();
+                try {
+
+                    long offset = consumer.fetchConsumeOffset(mq, false);
+                    if (offset < 0)
+                        offset = 0;
+
+                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
+                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
+
+
+                    context.setPullNextDelayTimeMillis(100);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        scheduleService.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
new file mode 100644
index 0000000..5628ced
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+public class PushConsumer {
+
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+        consumer.subscribe("Jodie_topic_1023", "*");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            /**
+
+             */
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
new file mode 100644
index 0000000..fc6bacd
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class RandomAsyncCommit {
+    private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable =
+            new ConcurrentHashMap<MessageQueue, CachedQueue>();
+
+
+    public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) {
+        CachedQueue cachedQueue = this.mqCachedTable.get(mq);
+        if (null == cachedQueue) {
+            cachedQueue = new CachedQueue();
+            this.mqCachedTable.put(mq, cachedQueue);
+        }
+        for (MessageExt msg : msgs) {
+            cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg);
+        }
+    }
+
+
+    public void removeMessage(final MessageQueue mq, long offset) {
+        CachedQueue cachedQueue = this.mqCachedTable.get(mq);
+        if (null != cachedQueue) {
+            cachedQueue.getMsgCachedTable().remove(offset);
+        }
+    }
+
+
+    public long commitableOffset(final MessageQueue mq) {
+        CachedQueue cachedQueue = this.mqCachedTable.get(mq);
+        if (null != cachedQueue) {
+            return cachedQueue.getMsgCachedTable().firstKey();
+        }
+
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
new file mode 100644
index 0000000..68347a6
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+public class TestProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+        producer.start();
+
+        for (int i = 0; i < 1; i++)
+            try {
+                {
+                    Message msg = new Message("TopicTest1",
+                            "TagA",
+                            "key113",
+                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    SendResult sendResult = producer.send(msg);
+                    System.out.printf("%s%n", sendResult);
+
+                    QueryResult queryMessage =
+                            producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
+                    for (MessageExt m : queryMessage.getMessageList()) {
+                        System.out.printf("%s%n", m);
+                    }
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
new file mode 100644
index 0000000..2e91e34
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.transaction;
+
+import com.alibaba.rocketmq.client.producer.LocalTransactionState;
+import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class TransactionCheckListenerImpl implements TransactionCheckListener {
+    private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+
+    @Override
+    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
+        System.out.printf("server checking TrMsg " + msg.toString() + "%n");
+
+        int value = transactionIndex.getAndIncrement();
+        if ((value % 6) == 0) {
+            throw new RuntimeException("Could not find db");
+        } else if ((value % 5) == 0) {
+            return LocalTransactionState.ROLLBACK_MESSAGE;
+        } else if ((value % 4) == 0) {
+            return LocalTransactionState.COMMIT_MESSAGE;
+        }
+
+        return LocalTransactionState.UNKNOW;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
new file mode 100644
index 0000000..cda523a
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.example.transaction;
+
+import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
+import com.alibaba.rocketmq.client.producer.LocalTransactionState;
+import com.alibaba.rocketmq.common.message.Message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TransactionExecuterImpl implements LocalTransactionExecuter {
+    private AtomicInteger transactionIndex = new AtomicInteger(1);
+
+
+    @Override
+    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
+        int value = transactionIndex.getAndIncrement();
+
+        if (value == 0) {
+            throw new RuntimeException("Could not find db");
+        } else if ((value % 5) == 0) {
+            return LocalTransactionState.ROLLBACK_MESSAGE;
+        } else if ((value % 4) == 0) {
+            return LocalTransactionState.COMMIT_MESSAGE;
+        }
+
+        return LocalTransactionState.UNKNOW;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
new file mode 100644
index 0000000..2c4745f
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.transaction;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
+import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+
+public class TransactionProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
+        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+        producer.setCheckThreadPoolMinSize(2);
+        producer.setCheckThreadPoolMaxSize(2);
+        producer.setCheckRequestHoldMax(2000);
+        producer.setTransactionCheckListener(transactionCheckListener);
+        producer.start();
+
+        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
+        for (int i = 0; i < 100; i++) {
+            try {
+                Message msg =
+                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
+                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
+                System.out.printf("%s%n", sendResult);
+
+                Thread.sleep(10);
+            } catch (MQClientException e) {
+                e.printStackTrace();
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+        }
+
+        for (int i = 0; i < 100000; i++) {
+            Thread.sleep(1000);
+        }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/resources/MessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/resources/MessageFilterImpl.java b/rocketmq-example/src/main/resources/MessageFilterImpl.java
new file mode 100644
index 0000000..ea6f3d8
--- /dev/null
+++ b/rocketmq-example/src/main/resources/MessageFilterImpl.java
@@ -0,0 +1,22 @@
+package com.alibaba.rocketmq.example.filter;
+
+import com.alibaba.rocketmq.common.filter.MessageFilter;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+public class MessageFilterImpl implements MessageFilter {
+
+    @Override
+    public boolean match(MessageExt msg) {
+        String property = msg.getProperty("SequenceId");
+        if (property != null) {
+            int id = Integer.parseInt(property);
+            if (((id % 10) == 0) && //
+                    (id > 100)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/pom.xml b/rocketmq-filtersrv/pom.xml
new file mode 100644
index 0000000..28c360b
--- /dev/null
+++ b/rocketmq-filtersrv/pom.xml
@@ -0,0 +1,62 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-filtersrv</artifactId>
+    <name>rocketmq-filtersrv ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-store</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java
new file mode 100644
index 0000000..b469b3f
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.filtersrv;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
+import com.alibaba.rocketmq.remoting.RemotingClient;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FilterServerOuterAPI {
+    private final RemotingClient remotingClient;
+
+
+    public FilterServerOuterAPI() {
+        this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
+    }
+
+
+    public void start() {
+        this.remotingClient.start();
+    }
+
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+    }
+
+
+    public RegisterFilterServerResponseHeader registerFilterServerToBroker(
+            final String brokerAddr,
+            final String filterServerAddr
+    ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader();
+        requestHeader.setFilterServerAddr(filterServerAddr);
+        RemotingCommand request =
+                RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                RegisterFilterServerResponseHeader responseHeader =
+                        (RegisterFilterServerResponseHeader) response
+                                .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);
+
+                return responseHeader;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java
new file mode 100644
index 0000000..fac620f
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+
+
+public class FiltersrvConfig {
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
+            System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+    @ImportantField
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
+            System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    private String connectWhichBroker = "127.0.0.1:10911";
+    private String filterServerIP = RemotingUtil.getLocalAddress();
+
+    private int compressMsgBodyOverHowmuch = 1024 * 8;
+    private int zipCompressLevel = 5;
+
+
+    private boolean clientUploadFilterClassEnable = true;
+
+
+    private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass";
+
+    private int fsServerAsyncSemaphoreValue = 2048;
+    private int fsServerCallbackExecutorThreads = 64;
+    private int fsServerWorkerThreads = 64;
+
+
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+
+    public String getConnectWhichBroker() {
+        return connectWhichBroker;
+    }
+
+
+    public void setConnectWhichBroker(String connectWhichBroker) {
+        this.connectWhichBroker = connectWhichBroker;
+    }
+
+
+    public String getFilterServerIP() {
+        return filterServerIP;
+    }
+
+
+    public void setFilterServerIP(String filterServerIP) {
+        this.filterServerIP = filterServerIP;
+    }
+
+
+    public int getCompressMsgBodyOverHowmuch() {
+        return compressMsgBodyOverHowmuch;
+    }
+
+
+    public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+        this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+    }
+
+
+    public int getZipCompressLevel() {
+        return zipCompressLevel;
+    }
+
+
+    public void setZipCompressLevel(int zipCompressLevel) {
+        this.zipCompressLevel = zipCompressLevel;
+    }
+
+
+    public boolean isClientUploadFilterClassEnable() {
+        return clientUploadFilterClassEnable;
+    }
+
+
+    public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) {
+        this.clientUploadFilterClassEnable = clientUploadFilterClassEnable;
+    }
+
+
+    public String getFilterClassRepertoryUrl() {
+        return filterClassRepertoryUrl;
+    }
+
+
+    public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) {
+        this.filterClassRepertoryUrl = filterClassRepertoryUrl;
+    }
+
+
+    public int getFsServerAsyncSemaphoreValue() {
+        return fsServerAsyncSemaphoreValue;
+    }
+
+
+    public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) {
+        this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue;
+    }
+
+
+    public int getFsServerCallbackExecutorThreads() {
+        return fsServerCallbackExecutorThreads;
+    }
+
+
+    public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) {
+        this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads;
+    }
+
+
+    public int getFsServerWorkerThreads() {
+        return fsServerWorkerThreads;
+    }
+
+
+    public void setFsServerWorkerThreads(int fsServerWorkerThreads) {
+        this.fsServerWorkerThreads = fsServerWorkerThreads;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java
new file mode 100644
index 0000000..0e3f696
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.filtersrv;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
+import com.alibaba.rocketmq.filtersrv.filter.FilterClassManager;
+import com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor;
+import com.alibaba.rocketmq.filtersrv.stats.FilterServerStatsManager;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FiltersrvController {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+    private final FiltersrvConfig filtersrvConfig;
+
+    private final NettyServerConfig nettyServerConfig;
+    private final FilterClassManager filterClassManager;
+
+    private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();
+    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
+            MixAll.FILTERSRV_CONSUMER_GROUP);
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
+    private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();
+
+    private RemotingServer remotingServer;
+
+    private ExecutorService remotingExecutor;
+    private volatile String brokerName = null;
+
+
+    public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
+        this.filtersrvConfig = filtersrvConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.filterClassManager = new FilterClassManager(this);
+    }
+
+
+    public boolean initialize() {
+
+        MixAll.printObjectProperties(log, this.filtersrvConfig);
+
+
+        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
+
+
+        this.remotingExecutor =
+                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
+                        new ThreadFactoryImpl("RemotingExecutorThread_"));
+
+        this.registerProcessor();
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                FiltersrvController.this.registerFilterServerToBroker();
+            }
+        }, 3, 10, TimeUnit.SECONDS);
+
+        this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer
+                .getBrokerSuspendMaxTimeMillis() - 1000);
+        this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer
+                .getConsumerTimeoutMillisWhenSuspend() - 1000);
+
+        this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
+        this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
+
+        return true;
+    }
+
+    private void registerProcessor() {
+        this.remotingServer
+                .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+    }
+
+    public void registerFilterServerToBroker() {
+        try {
+            RegisterFilterServerResponseHeader responseHeader =
+                    this.filterServerOuterAPI.registerFilterServerToBroker(
+                            this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
+            this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
+                    .setDefaultBrokerId(responseHeader.getBrokerId());
+
+            if (null == this.brokerName) {
+                this.brokerName = responseHeader.getBrokerName();
+            }
+
+            log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
+                    this.localAddr(),
+                    this.filtersrvConfig.getConnectWhichBroker(),
+                    responseHeader.getBrokerName(),
+                    responseHeader.getBrokerId());
+        } catch (Exception e) {
+            log.warn("register filter server Exception", e);
+
+            log.warn("access broker failed, kill oneself");
+            System.exit(-1);
+        }
+    }
+
+    public String localAddr() {
+        return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(),
+                this.remotingServer.localListenPort());
+    }
+
+    public void start() throws Exception {
+        this.defaultMQPullConsumer.start();
+        this.remotingServer.start();
+        this.filterServerOuterAPI.start();
+        this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
+                .setConnectBrokerByUser(true);
+        this.filterClassManager.start();
+        this.filterServerStatsManager.start();
+    }
+
+
+    public void shutdown() {
+        this.remotingServer.shutdown();
+        this.remotingExecutor.shutdown();
+        this.scheduledExecutorService.shutdown();
+        this.defaultMQPullConsumer.shutdown();
+        this.filterServerOuterAPI.shutdown();
+        this.filterClassManager.shutdown();
+        this.filterServerStatsManager.shutdown();
+    }
+
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+
+    public void setRemotingServer(RemotingServer remotingServer) {
+        this.remotingServer = remotingServer;
+    }
+
+
+    public ExecutorService getRemotingExecutor() {
+        return remotingExecutor;
+    }
+
+
+    public void setRemotingExecutor(ExecutorService remotingExecutor) {
+        this.remotingExecutor = remotingExecutor;
+    }
+
+
+    public FiltersrvConfig getFiltersrvConfig() {
+        return filtersrvConfig;
+    }
+
+
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+
+    public ScheduledExecutorService getScheduledExecutorService() {
+        return scheduledExecutorService;
+    }
+
+
+    public FilterServerOuterAPI getFilterServerOuterAPI() {
+        return filterServerOuterAPI;
+    }
+
+
+    public FilterClassManager getFilterClassManager() {
+        return filterClassManager;
+    }
+
+
+    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
+        return defaultMQPullConsumer;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public FilterServerStatsManager getFilterServerStatsManager() {
+        return filterServerStatsManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java
new file mode 100644
index 0000000..3fe6b22
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.filtersrv;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettySystemConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class FiltersrvStartup {
+    public static Logger log;
+
+    public static void main(String[] args) {
+        start(createController(args));
+    }
+
+    public static FiltersrvController start(FiltersrvController controller) {
+
+        try {
+            controller.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        String tip = "The Filter Server boot success, " + controller.localAddr();
+        log.info(tip);
+        System.out.printf("%s%n", tip);
+
+        return controller;
+    }
+
+    public static FiltersrvController createController(String[] args) {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+            NettySystemConfig.socketSndbufSize = 65535;
+        }
+
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+            NettySystemConfig.socketRcvbufSize = 1024;
+        }
+
+        try {
+            Options options = ServerUtil.buildCommandlineOptions(new Options());
+            final CommandLine commandLine =
+                    ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options),
+                            new PosixParser());
+            if (null == commandLine) {
+                System.exit(-1);
+                return null;
+            }
+
+            final FiltersrvConfig filtersrvConfig = new FiltersrvConfig();
+            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+
+            if (commandLine.hasOption('c')) {
+                String file = commandLine.getOptionValue('c');
+                if (file != null) {
+                    InputStream in = new BufferedInputStream(new FileInputStream(file));
+                    Properties properties = new Properties();
+                    properties.load(in);
+                    MixAll.properties2Object(properties, filtersrvConfig);
+                    System.out.printf("load config properties file OK, " + file + "%n");
+                    in.close();
+
+                    String port = properties.getProperty("listenPort");
+                    if (port != null) {
+                        filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port));
+                    }
+                }
+            }
+
+            nettyServerConfig.setListenPort(0);
+            nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue());
+            nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig
+                    .getFsServerCallbackExecutorThreads());
+            nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads());
+
+            if (commandLine.hasOption('p')) {
+                MixAll.printObjectProperties(null, filtersrvConfig);
+                MixAll.printObjectProperties(null, nettyServerConfig);
+                System.exit(0);
+            }
+
+            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);
+            if (null == filtersrvConfig.getRocketmqHome()) {
+                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+                        + " variable in your environment to match the location of the RocketMQ installation%n");
+                System.exit(-2);
+            }
+
+            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            JoranConfigurator configurator = new JoranConfigurator();
+            configurator.setContext(lc);
+            lc.reset();
+            configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");
+            log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+            final FiltersrvController controller =
+                    new FiltersrvController(filtersrvConfig, nettyServerConfig);
+            boolean initResult = controller.initialize();
+            if (!initResult) {
+                controller.shutdown();
+                System.exit(-3);
+            }
+
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+                private volatile boolean hasShutdown = false;
+                private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+                @Override
+                public void run() {
+                    synchronized (this) {
+                        log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+                        if (!this.hasShutdown) {
+                            this.hasShutdown = true;
+                            long begineTime = System.currentTimeMillis();
+                            controller.shutdown();
+                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+                            log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+                        }
+                    }
+                }
+            }, "ShutdownHook"));
+
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+        return null;
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Filter server config properties file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "printConfigItem", false, "Print all config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java
new file mode 100644
index 0000000..e17e5d2
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.filter.FilterAPI;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.util.*;
+
+
+public class DynaCode {
+    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+    private static final String FILE_SP = System.getProperty("file.separator");
+
+    private static final String LINE_SP = System.getProperty("line.separator");
+
+    private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP
+            + UtilAll.getPid();
+
+    private String outPutClassPath = sourcePath;
+
+
+    private ClassLoader parentClassLoader;
+
+
+    private List<String> codeStrs;
+
+
+    private Map<String/* fullClassName */, Class<?>/* class */> loadClass;
+
+
+    private String classpath;
+
+
+    private String bootclasspath;
+
+
+    private String extdirs;
+
+
+    private String encoding = "UTF-8";
+
+
+    private String target;
+
+
+    @SuppressWarnings("unchecked")
+    public DynaCode(String code) {
+        this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code));
+    }
+
+
+    public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) {
+        this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs);
+    }
+
+
+    public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) {
+        this.classpath = classpath;
+        this.parentClassLoader = parentClassLoader;
+        this.codeStrs = codeStrs;
+        this.loadClass = new HashMap<String, Class<?>>(codeStrs.size());
+    }
+
+
+    private static String extractClasspath(ClassLoader cl) {
+        StringBuffer buf = new StringBuffer();
+        while (cl != null) {
+            if (cl instanceof URLClassLoader) {
+                URL urls[] = ((URLClassLoader) cl).getURLs();
+                for (int i = 0; i < urls.length; i++) {
+                    if (buf.length() > 0) {
+                        buf.append(File.pathSeparatorChar);
+                    }
+                    String s = urls[i].getFile();
+                    try {
+                        s = URLDecoder.decode(s, "UTF-8");
+                    } catch (UnsupportedEncodingException e) {
+                        continue;
+                    }
+                    File f = new File(s);
+                    buf.append(f.getAbsolutePath());
+                }
+            }
+            cl = cl.getParent();
+        }
+        return buf.toString();
+    }
+
+
+    public DynaCode(List<String> codeStrs) {
+        this(Thread.currentThread().getContextClassLoader(), codeStrs);
+    }
+
+    public static Class<?> compileAndLoadClass(final String className, final String javaSource)
+            throws Exception {
+        String classSimpleName = FilterAPI.simpleClassName(className);
+        String javaCode = javaSource;
+
+        final String newClassSimpleName = classSimpleName + System.currentTimeMillis();
+        String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName);
+
+        List<String> codes = new ArrayList<String>();
+        codes.add(newJavaCode);
+        DynaCode dc = new DynaCode(codes);
+        dc.compileAndLoadClass();
+        Map<String, Class<?>> map = dc.getLoadClass();
+
+        Class<?> clazz = map.get(getQualifiedName(newJavaCode));
+        return clazz;
+    }
+
+    public void compileAndLoadClass() throws Exception {
+        String[] sourceFiles = this.uploadSrcFile();
+        this.compile(sourceFiles);
+        this.loadClass(this.loadClass.keySet());
+    }
+
+    public Map<String, Class<?>> getLoadClass() {
+        return loadClass;
+    }
+
+    public static String getQualifiedName(String code) {
+        StringBuilder sb = new StringBuilder();
+        String className = getClassName(code);
+        if (StringUtils.isNotBlank(className)) {
+
+            String packageName = getPackageName(code);
+            if (StringUtils.isNotBlank(packageName)) {
+                sb.append(packageName).append(".");
+            }
+            sb.append(className);
+        }
+        return sb.toString();
+    }
+
+    private String[] uploadSrcFile() throws Exception {
+        List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size());
+        for (String code : codeStrs) {
+            if (StringUtils.isNotBlank(code)) {
+                String packageName = getPackageName(code);
+                String className = getClassName(code);
+                if (StringUtils.isNotBlank(className)) {
+                    File srcFile = null;
+                    BufferedWriter bufferWriter = null;
+                    try {
+                        if (StringUtils.isBlank(packageName)) {
+                            File pathFile = new File(sourcePath);
+
+                            if (!pathFile.exists()) {
+                                if (!pathFile.mkdirs()) {
+                                    throw new RuntimeException("create PathFile Error!");
+                                }
+                            }
+                            srcFile = new File(sourcePath + FILE_SP + className + ".java");
+                        } else {
+                            String srcPath = StringUtils.replace(packageName, ".", FILE_SP);
+                            File pathFile = new File(sourcePath + FILE_SP + srcPath);
+
+                            if (!pathFile.exists()) {
+                                if (!pathFile.mkdirs()) {
+                                    throw new RuntimeException("create PathFile Error!");
+                                }
+                            }
+                            srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java");
+                        }
+                        synchronized (loadClass) {
+                            loadClass.put(getFullClassName(code), null);
+                        }
+                        if (null != srcFile) {
+                            LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath());
+                            srcFileAbsolutePaths.add(srcFile.getAbsolutePath());
+                            srcFile.deleteOnExit();
+                        }
+                        OutputStreamWriter outputStreamWriter =
+                                new OutputStreamWriter(new FileOutputStream(srcFile), encoding);
+                        bufferWriter = new BufferedWriter(outputStreamWriter);
+                        for (String lineCode : code.split(LINE_SP)) {
+                            bufferWriter.write(lineCode);
+                            bufferWriter.newLine();
+                        }
+                        bufferWriter.flush();
+                    } finally {
+                        if (null != bufferWriter) {
+                            bufferWriter.close();
+                        }
+                    }
+                }
+            }
+        }
+        return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]);
+    }
+
+    private void compile(String[] srcFiles) throws Exception {
+        String args[] = this.buildCompileJavacArgs(srcFiles);
+        ByteArrayOutputStream err = new ByteArrayOutputStream();
+        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+        if (compiler == null) {
+            throw new NullPointerException(
+                    "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!");
+        }
+        int resultCode = compiler.run(null, null, err, args);
+        if (resultCode != 0) {
+            throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET));
+        }
+    }
+
+    private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException {
+        synchronized (loadClass) {
+            ClassLoader classLoader =
+                    new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()},
+                            parentClassLoader);
+            for (String key : classFullNames) {
+                Class<?> classz = classLoader.loadClass(key);
+                if (null != classz) {
+                    loadClass.put(key, classz);
+                    LOGGER.info("Dyna Load Java Class File OK:----> className: " + key);
+                } else {
+                    LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key);
+                }
+            }
+        }
+    }
+
+    public static String getClassName(String code) {
+        String className = StringUtils.substringBefore(code, "{");
+        if (StringUtils.isBlank(className)) {
+            return className;
+        }
+        if (StringUtils.contains(code, " class ")) {
+            className = StringUtils.substringAfter(className, " class ");
+            if (StringUtils.contains(className, " extends ")) {
+                className = StringUtils.substringBefore(className, " extends ").trim();
+            } else if (StringUtils.contains(className, " implements ")) {
+                className = StringUtils.trim(StringUtils.substringBefore(className, " implements "));
+            } else {
+                className = StringUtils.trim(className);
+            }
+        } else if (StringUtils.contains(code, " interface ")) {
+            className = StringUtils.substringAfter(className, " interface ");
+            if (StringUtils.contains(className, " extends ")) {
+                className = StringUtils.substringBefore(className, " extends ").trim();
+            } else {
+                className = StringUtils.trim(className);
+            }
+        } else if (StringUtils.contains(code, " enum ")) {
+            className = StringUtils.trim(StringUtils.substringAfter(className, " enum "));
+        } else {
+            return StringUtils.EMPTY;
+        }
+        return className;
+    }
+
+    public static String getPackageName(String code) {
+        String packageName =
+                StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim();
+        return packageName;
+    }
+
+    public static String getFullClassName(String code) {
+        String packageName = getPackageName(code);
+        String className = getClassName(code);
+        return StringUtils.isBlank(packageName) ? className : packageName + "." + className;
+    }
+
+    private String[] buildCompileJavacArgs(String srcFiles[]) {
+        ArrayList<String> args = new ArrayList<String>();
+        if (StringUtils.isNotBlank(classpath)) {
+            args.add("-classpath");
+            args.add(classpath);
+        }
+        if (StringUtils.isNotBlank(outPutClassPath)) {
+            args.add("-d");
+            args.add(outPutClassPath);
+        }
+        if (StringUtils.isNotBlank(sourcePath)) {
+            args.add("-sourcepath");
+            args.add(sourcePath);
+        }
+        if (StringUtils.isNotBlank(bootclasspath)) {
+            args.add("-bootclasspath");
+            args.add(bootclasspath);
+        }
+        if (StringUtils.isNotBlank(extdirs)) {
+            args.add("-extdirs");
+            args.add(extdirs);
+        }
+        if (StringUtils.isNotBlank(encoding)) {
+            args.add("-encoding");
+            args.add(encoding);
+        }
+        if (StringUtils.isNotBlank(target)) {
+            args.add("-target");
+            args.add(target);
+        }
+        for (int i = 0; i < srcFiles.length; i++) {
+            args.add(srcFiles[i]);
+        }
+        return args.toArray(new String[args.size()]);
+    }
+
+    public String getOutPutClassPath() {
+        return outPutClassPath;
+    }
+
+    public void setOutPutClassPath(String outPutClassPath) {
+        this.outPutClassPath = outPutClassPath;
+    }
+
+    public String getSourcePath() {
+        return sourcePath;
+    }
+
+    public void setSourcePath(String sourcePath) {
+        this.sourcePath = sourcePath;
+    }
+
+    public ClassLoader getParentClassLoader() {
+        return parentClassLoader;
+    }
+
+    public void setParentClassLoader(ClassLoader parentClassLoader) {
+        this.parentClassLoader = parentClassLoader;
+    }
+
+    public String getClasspath() {
+        return classpath;
+    }
+
+    public void setClasspath(String classpath) {
+        this.classpath = classpath;
+    }
+
+    public String getBootclasspath() {
+        return bootclasspath;
+    }
+
+    public void setBootclasspath(String bootclasspath) {
+        this.bootclasspath = bootclasspath;
+    }
+
+    public String getExtdirs() {
+        return extdirs;
+    }
+
+    public void setExtdirs(String extdirs) {
+        this.extdirs = extdirs;
+    }
+
+    public String getEncoding() {
+        return encoding;
+    }
+
+    public void setEncoding(String encoding) {
+        this.encoding = encoding;
+    }
+
+    public String getTarget() {
+        return target;
+    }
+
+    public void setTarget(String target) {
+        this.target = target;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
new file mode 100644
index 0000000..27b19ce
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+public interface FilterClassFetchMethod {
+    public String fetch(final String topic, final String consumerGroup, final String className);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java
new file mode 100644
index 0000000..f3e747e
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+import com.alibaba.rocketmq.common.filter.MessageFilter;
+
+
+public class FilterClassInfo {
+    private String className;
+    private int classCRC;
+    private MessageFilter messageFilter;
+
+
+    public int getClassCRC() {
+        return classCRC;
+    }
+
+
+    public void setClassCRC(int classCRC) {
+        this.classCRC = classCRC;
+    }
+
+
+    public MessageFilter getMessageFilter() {
+        return messageFilter;
+    }
+
+
+    public void setMessageFilter(MessageFilter messageFilter) {
+        this.messageFilter = messageFilter;
+    }
+
+
+    public String getClassName() {
+        return className;
+    }
+
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java
new file mode 100644
index 0000000..8966ca2
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+public class FilterClassLoader extends ClassLoader {
+    public final Class<?> createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError {
+        return this.defineClass(name, b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java
new file mode 100644
index 0000000..618db8e
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.filter.MessageFilter;
+import com.alibaba.rocketmq.filtersrv.FiltersrvController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class FilterClassManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+    private final Object compileLock = new Object();
+    private final FiltersrvController filtersrvController;
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
+    private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
+            new ConcurrentHashMap<String, FilterClassInfo>(128);
+    private FilterClassFetchMethod filterClassFetchMethod;
+
+
+    public FilterClassManager(FiltersrvController filtersrvController) {
+        this.filtersrvController = filtersrvController;
+        this.filterClassFetchMethod =
+                new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
+                        .getFilterClassRepertoryUrl());
+    }
+
+
+    public void start() {
+        if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                @Override
+                public void run() {
+                    fetchClassFromRemoteHost();
+                }
+            }, 1, 1, TimeUnit.MINUTES);
+        }
+    }
+
+    private void fetchClassFromRemoteHost() {
+        Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator();
+        while (it.hasNext()) {
+            try {
+                Entry<String, FilterClassInfo> next = it.next();
+                FilterClassInfo filterClassInfo = next.getValue();
+                String[] topicAndGroup = next.getKey().split("@");
+                String responseStr =
+                        this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
+                                filterClassInfo.getClassName());
+                byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
+                int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
+                if (classCRC != filterClassInfo.getClassCRC()) {
+                    String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
+                    Class<?> newClass =
+                            DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
+                    Object newInstance = newClass.newInstance();
+                    filterClassInfo.setMessageFilter((MessageFilter) newInstance);
+                    filterClassInfo.setClassCRC(classCRC);
+
+                    log.info("fetch Remote class File OK, {} {}", next.getKey(),
+                            filterClassInfo.getClassName());
+                }
+            } catch (Exception e) {
+                log.error("fetchClassFromRemoteHost Exception", e);
+            }
+        }
+    }
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+    }
+
+    public boolean registerFilterClass(final String consumerGroup, final String topic,
+                                       final String className, final int classCRC, final byte[] filterSourceBinary) {
+        final String key = buildKey(consumerGroup, topic);
+
+
+        boolean registerNew = false;
+        FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
+        if (null == filterClassInfoPrev) {
+            registerNew = true;
+        } else {
+            if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+                if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) {
+                    registerNew = true;
+                }
+            }
+        }
+
+        if (registerNew) {
+            synchronized (this.compileLock) {
+                filterClassInfoPrev = this.filterClassTable.get(key);
+                if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
+                    return true;
+                }
+
+                try {
+
+                    FilterClassInfo filterClassInfoNew = new FilterClassInfo();
+                    filterClassInfoNew.setClassName(className);
+                    filterClassInfoNew.setClassCRC(0);
+                    filterClassInfoNew.setMessageFilter(null);
+
+                    if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
+                        String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
+                        Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
+                        Object newInstance = newClass.newInstance();
+                        filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
+                        filterClassInfoNew.setClassCRC(classCRC);
+                    }
+
+                    this.filterClassTable.put(key, filterClassInfoNew);
+                } catch (Throwable e) {
+                    String info =
+                            String
+                                    .format(
+                                            "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
+                                            consumerGroup, topic, className);
+                    log.error(info, e);
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private static String buildKey(final String consumerGroup, final String topic) {
+        return topic + "@" + consumerGroup;
+    }
+
+    public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) {
+        return this.filterClassTable.get(buildKey(consumerGroup, topic));
+    }
+
+
+    public FilterClassFetchMethod getFilterClassFetchMethod() {
+        return filterClassFetchMethod;
+    }
+
+
+    public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) {
+        this.filterClassFetchMethod = filterClassFetchMethod;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
new file mode 100644
index 0000000..88cb572
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.filter;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.utils.HttpTinyClient;
+import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+    private final String url;
+
+
+    public HttpFilterClassFetchMethod(String url) {
+        this.url = url;
+    }
+
+
+    @Override
+    public String fetch(String topic, String consumerGroup, String className) {
+        String thisUrl = String.format("%s/%s.java", this.url, className);
+
+        try {
+            HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000);
+            if (200 == result.code) {
+                return result.content;
+            }
+        } catch (Exception e) {
+            log.error(
+                    String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
+        }
+
+        return null;
+    }
+}