You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/25 09:06:26 UTC

[rocketmq] branch snode updated: Add sql filter support for snode

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 4649a8e  Add sql filter support for snode
4649a8e is described below

commit 4649a8ed05dbbf107438af0799044fcf80c20e4d
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jan 25 17:05:46 2019 +0800

    Add sql filter support for snode
---
 .../processor/SnodePullMessageProcessor.java       |  4 +-
 .../client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../org/apache/rocketmq/snode/SnodeController.java |  1 +
 .../apache/rocketmq/snode/config/SnodeConfig.java  |  9 +++++
 .../snode/processor/HeartbeatProcessor.java        | 43 ++++++++++++++++++++++
 5 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
index 74a0176..ee67e21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -49,10 +49,10 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -113,7 +113,7 @@ public class SnodePullMessageProcessor implements RequestProcessor {
             }
         } catch (Exception e) {
             log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
-                requestHeader.getConsumerGroup());
+                requestHeader.getConsumerGroup(), e);
             response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
             response.setRemark(e.getMessage());
             return response;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index d51030a..92c7c18 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -220,7 +220,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     /**
      * Whether update subscription relationship when every pull
      */
-    private boolean postSubscriptionWhenPull = false;
+    private boolean postSubscriptionWhenPull = true;
 
     /**
      * Whether the unit of subscription group
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 0707db5..8b6cabd 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -230,6 +230,7 @@ public class SnodeController {
         this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
         this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
         this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
         this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
         this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
         this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 3143d7d..941815a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -78,6 +78,7 @@ public class SnodeConfig {
     private int listenPort = 11911;
 
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+    private boolean enablePropertyFilter = true;
 
     public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
         this.snodeHeartBeatInterval = snodeHeartBeatInterval;
@@ -291,4 +292,12 @@ public class SnodeConfig {
     public void setSlowConsumerThreshold(int slowConsumerThreshold) {
         this.slowConsumerThreshold = slowConsumerThreshold;
     }
+
+    public boolean isEnablePropertyFilter() {
+        return enablePropertyFilter;
+    }
+
+    public void setEnablePropertyFilter(boolean enablePropertyFilter) {
+        this.enablePropertyFilter = enablePropertyFilter;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 36e27fc..a36704c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -21,14 +21,18 @@ import io.netty.util.Attribute;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.FilterFactory;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
@@ -57,6 +61,8 @@ public class HeartbeatProcessor implements RequestProcessor {
                 return register(remotingChannel, request);
             case RequestCode.UNREGISTER_CLIENT:
                 return unregister(remotingChannel, request);
+            case RequestCode.CHECK_CLIENT_CONFIG:
+                return this.checkClientConfig(remotingChannel, request);
             default:
                 break;
         }
@@ -131,6 +137,43 @@ public class HeartbeatProcessor implements RequestProcessor {
         return response;
     }
 
+    public RemotingCommand checkClientConfig(RemotingChannel ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
+            CheckClientRequestBody.class);
+
+        if (requestBody != null && requestBody.getSubscriptionData() != null) {
+            SubscriptionData subscriptionData = requestBody.getSubscriptionData();
+
+            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+                return response;
+            }
+
+            if (!this.snodeController.getSnodeConfig().isEnablePropertyFilter()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("The snode does not support consumer to filter message by " + subscriptionData.getExpressionType());
+                return response;
+            }
+
+            try {
+                FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
+            } catch (Exception e) {
+                log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
+                    requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
+                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+                response.setRemark(e.getMessage());
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;