You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/25 09:06:26 UTC
[rocketmq] branch snode updated: Add sql filter support for snode
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 4649a8e Add sql filter support for snode
4649a8e is described below
commit 4649a8ed05dbbf107438af0799044fcf80c20e4d
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jan 25 17:05:46 2019 +0800
Add sql filter support for snode
---
.../processor/SnodePullMessageProcessor.java | 4 +-
.../client/consumer/DefaultMQPushConsumer.java | 2 +-
.../org/apache/rocketmq/snode/SnodeController.java | 1 +
.../apache/rocketmq/snode/config/SnodeConfig.java | 9 +++++
.../snode/processor/HeartbeatProcessor.java | 43 ++++++++++++++++++++++
5 files changed, 56 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
index 74a0176..ee67e21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -49,10 +49,10 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -113,7 +113,7 @@ public class SnodePullMessageProcessor implements RequestProcessor {
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
- requestHeader.getConsumerGroup());
+ requestHeader.getConsumerGroup(), e);
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index d51030a..92c7c18 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -220,7 +220,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
- private boolean postSubscriptionWhenPull = false;
+ private boolean postSubscriptionWhenPull = true;
/**
* Whether the unit of subscription group
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 0707db5..8b6cabd 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -230,6 +230,7 @@ public class SnodeController {
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 3143d7d..941815a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -78,6 +78,7 @@ public class SnodeConfig {
private int listenPort = 11911;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+ private boolean enablePropertyFilter = true;
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
@@ -291,4 +292,12 @@ public class SnodeConfig {
public void setSlowConsumerThreshold(int slowConsumerThreshold) {
this.slowConsumerThreshold = slowConsumerThreshold;
}
+
+ public boolean isEnablePropertyFilter() {
+ return enablePropertyFilter;
+ }
+
+ public void setEnablePropertyFilter(boolean enablePropertyFilter) {
+ this.enablePropertyFilter = enablePropertyFilter;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 36e27fc..a36704c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -21,14 +21,18 @@ import io.netty.util.Attribute;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
@@ -57,6 +61,8 @@ public class HeartbeatProcessor implements RequestProcessor {
return register(remotingChannel, request);
case RequestCode.UNREGISTER_CLIENT:
return unregister(remotingChannel, request);
+ case RequestCode.CHECK_CLIENT_CONFIG:
+ return this.checkClientConfig(remotingChannel, request);
default:
break;
}
@@ -131,6 +137,43 @@ public class HeartbeatProcessor implements RequestProcessor {
return response;
}
+ public RemotingCommand checkClientConfig(RemotingChannel ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
+ CheckClientRequestBody.class);
+
+ if (requestBody != null && requestBody.getSubscriptionData() != null) {
+ SubscriptionData subscriptionData = requestBody.getSubscriptionData();
+
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ if (!this.snodeController.getSnodeConfig().isEnablePropertyFilter()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The snode does not support consumer to filter message by " + subscriptionData.getExpressionType());
+ return response;
+ }
+
+ try {
+ FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
+ } catch (Exception e) {
+ log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
+ requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
+ response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
@Override
public boolean rejectRequest() {
return false;